package org.oracle.okafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import javax.jms.JMSException;
import oracle.jms.AQjmsBytesMessage;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.oracle.okafka.clients.KafkaClient;
import org.oracle.okafka.clients.Metadata;
import org.oracle.okafka.clients.NetworkClient;
import org.oracle.okafka.clients.consumer.internals.SubscriptionState;
import org.oracle.okafka.common.Node;
import org.oracle.okafka.common.internals.PartitionData;
import org.oracle.okafka.common.internals.SessionData;
import org.oracle.okafka.common.requests.CommitRequest;
import org.oracle.okafka.common.requests.CommitResponse;
import org.oracle.okafka.common.requests.ConnectMeRequest;
import org.oracle.okafka.common.requests.ConnectMeResponse;
import org.oracle.okafka.common.requests.FetchRequest;
import org.oracle.okafka.common.requests.FetchResponse;
import org.oracle.okafka.common.requests.JoinGroupRequest;
import org.oracle.okafka.common.requests.JoinGroupResponse;
import org.oracle.okafka.common.requests.OffsetResetRequest;
import org.oracle.okafka.common.requests.OffsetResetResponse;
import org.oracle.okafka.common.requests.SubscribeRequest;
import org.oracle.okafka.common.requests.SubscribeResponse;
import org.oracle.okafka.common.requests.SyncGroupRequest;
import org.oracle.okafka.common.requests.SyncGroupResponse;
import org.oracle.okafka.common.requests.UnsubscribeRequest;
import org.oracle.okafka.common.requests.UnsubscribeResponse;
import org.slf4j.Logger;

/* loaded from: input_file:org/oracle/okafka/clients/consumer/internals/ConsumerNetworkClient.class */
public class ConsumerNetworkClient {
    private static final int MAX_POLL_TIMEOUT_MS = 5000;
    private final Logger log;
    private final KafkaClient client;
    private final Metadata metadata;
    private final Time time;
    private final boolean autoCommitEnabled;
    private final int autoCommitIntervalMs;
    private long nextAutoCommitDeadline;
    private final long retryBackoffMs;
    private final int maxPollTimeoutMs;
    private final int requestTimeoutMs;
    private final int sesssionTimeoutMs;
    private final long defaultApiTimeoutMs;
    private final SubscriptionState subscriptions;
    private final List<ConsumerPartitionAssignor> assignors;
    String consumerGroupId;
    private final AQKafkaConsumer aqConsumer;
    private boolean rejoin = false;
    private boolean needsJoinPrepare = true;
    private SessionData sessionData = null;
    private final List<AQjmsBytesMessage> messages = new ArrayList();
    private Node currentSession = null;
    private Set<String> subscriptionSnapshot = new HashSet();

    public ConsumerNetworkClient(String str, LogContext logContext, KafkaClient kafkaClient, Metadata metadata, SubscriptionState subscriptionState, List<ConsumerPartitionAssignor> list, boolean z, int i, Time time, long j, int i2, int i3, int i4, long j2, AQKafkaConsumer aQKafkaConsumer) {
        this.consumerGroupId = str;
        this.log = logContext.logger(ConsumerNetworkClient.class);
        this.client = kafkaClient;
        this.metadata = metadata;
        this.subscriptions = subscriptionState;
        this.assignors = list;
        this.autoCommitEnabled = z;
        this.autoCommitIntervalMs = i;
        this.time = time;
        this.retryBackoffMs = j;
        this.maxPollTimeoutMs = Math.min(i3, MAX_POLL_TIMEOUT_MS);
        this.requestTimeoutMs = i2;
        this.sesssionTimeoutMs = i4;
        this.defaultApiTimeoutMs = j2;
        this.aqConsumer = aQKafkaConsumer;
        if (z) {
            this.nextAutoCommitDeadline = time.milliseconds() + i;
        }
    }

    public List<AQjmsBytesMessage> poll(long j) {
        long currentTimeMillis;
        long currentTimeMillis2 = System.currentTimeMillis();
        do {
            boolean z = false;
            this.messages.clear();
            Map<Node, String> pollableMap = getPollableMap();
            long milliseconds = this.time.milliseconds();
            RequestCompletionHandler requestCompletionHandler = new RequestCompletionHandler() { // from class: org.oracle.okafka.clients.consumer.internals.ConsumerNetworkClient.1
                public void onComplete(ClientResponse clientResponse) {
                }
            };
            this.log.debug("Polling for topics #" + pollableMap.entrySet().size());
            Iterator<Map.Entry<Node, String>> it = pollableMap.entrySet().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Map.Entry<Node, String> next = it.next();
                Node key = next.getKey();
                this.log.debug("Fetch Records for topic " + next.getValue() + " from host " + key);
                if (this.client.ready(key, milliseconds)) {
                    ClientResponse send = this.client.send(createFetchRequest(key, next.getValue(), requestCompletionHandler, ((long) this.requestTimeoutMs) < j ? this.requestTimeoutMs : (int) j), milliseconds);
                    handleFetchResponse(send, j);
                    if (send.wasDisconnected()) {
                        z = true;
                    }
                } else {
                    this.log.debug("Failed to consume messages from node: {}", key);
                    if (this.currentSession != null && this.currentSession == key) {
                        this.currentSession = null;
                    }
                }
            }
            currentTimeMillis = System.currentTimeMillis() - currentTimeMillis2;
            if (!z) {
                break;
            }
        } while (currentTimeMillis < j);
        return this.messages;
    }

    private Map<Node, String> getPollableMap() {
        try {
            if (this.currentSession == null) {
                List<Node> convertToOracleNodes = NetworkClient.convertToOracleNodes(this.metadata.fetch().nodes());
                if (convertToOracleNodes.size() == 1) {
                    this.currentSession = convertToOracleNodes.get(0);
                    this.log.debug("Leader Node " + this.currentSession);
                    this.metadata.setLeader(this.currentSession);
                    return Collections.singletonMap(this.currentSession, this.subscriptionSnapshot.iterator().next());
                }
                Iterator<Node> it = convertToOracleNodes.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    Node next = it.next();
                    if (this.client.isReady(next, 0L)) {
                        Node preferredNode = getPreferredNode(next, next.user(), this.subscriptionSnapshot.iterator().next(), this.consumerGroupId);
                        if (preferredNode == null) {
                            this.currentSession = next;
                        } else {
                            int id = preferredNode.id();
                            Iterator<Node> it2 = convertToOracleNodes.iterator();
                            while (true) {
                                if (!it2.hasNext()) {
                                    break;
                                }
                                Node next2 = it2.next();
                                if (next2.id() == id) {
                                    this.currentSession = next2;
                                    break;
                                }
                            }
                            if (this.currentSession == null) {
                                this.currentSession = preferredNode;
                                convertToOracleNodes.add(preferredNode);
                            }
                            for (Node node : convertToOracleNodes) {
                                if (node != this.currentSession && this.client.isReady(node, 0L)) {
                                    this.log.debug("Closing exta node: " + node);
                                    this.client.close(node);
                                }
                            }
                        }
                    }
                }
                if (this.currentSession == null) {
                    Cluster fetch = this.metadata.fetch();
                    if (fetch.controller() != null) {
                        this.currentSession = (Node) fetch.controller();
                    } else {
                        this.currentSession = convertToOracleNodes.get(0);
                    }
                    this.log.debug("There is no ready node available :using " + this.currentSession);
                } else {
                    this.log.debug("Leader for this metadata set to " + this.currentSession);
                    this.metadata.setLeader(this.currentSession);
                }
            }
            return Collections.singletonMap(this.currentSession, this.subscriptionSnapshot.iterator().next());
        } catch (NoSuchElementException e) {
            return Collections.emptyMap();
        }
    }

    private ClientRequest createFetchRequest(Node node, String str, RequestCompletionHandler requestCompletionHandler, int i) {
        return this.client.newClientRequest(node, new FetchRequest.Builder(str, i), this.time.milliseconds(), true, i, requestCompletionHandler);
    }

    private void handleFetchResponse(ClientResponse clientResponse, long j) {
        this.messages.addAll(((FetchResponse) clientResponse.responseBody()).getMessages());
        if (!clientResponse.wasDisconnected()) {
            joinGroupifNeeded(clientResponse, j);
            return;
        }
        this.client.disconnected(this.metadata.getNodeById(Integer.parseInt(clientResponse.destination())), this.time.milliseconds());
        this.rejoin = true;
        this.currentSession = null;
        if (this.sessionData != null) {
            this.log.info("Invalidating database session " + this.sessionData.name + ". New one will get created.");
            this.sessionData.invalidSessionData();
        }
    }

    private void joinGroupifNeeded(ClientResponse clientResponse, long j) {
        try {
            Exception exception = ((FetchResponse) clientResponse.responseBody()).getException();
            long requestLatencyMs = clientResponse.requestLatencyMs();
            long milliseconds = this.time.milliseconds();
            while (requestLatencyMs < j) {
                if (!rejoinNeeded(exception)) {
                    return;
                }
                this.log.debug("JoinGroup Is Needed");
                if (this.needsJoinPrepare) {
                    this.log.debug("Revoking");
                    onJoinPrepare();
                    this.needsJoinPrepare = false;
                }
                this.log.debug("Sending Join Group Request to database via node " + clientResponse.destination());
                sendJoinGroupRequest(this.metadata.getNodeById(Integer.parseInt(clientResponse.destination())));
                this.log.debug("Join Group Response received");
                exception = null;
                long milliseconds2 = this.time.milliseconds();
                requestLatencyMs += milliseconds2 - milliseconds;
                milliseconds = milliseconds2;
            }
        } catch (Exception e) {
            this.log.error(e.getMessage(), e);
            throw e;
        }
    }

    private boolean rejoinNeeded(Exception exc) {
        if (exc == null || !(exc instanceof JMSException) || !((JMSException) exc).getLinkedException().getMessage().startsWith("ORA-24003")) {
            return this.rejoin;
        }
        this.log.debug("Join Group is needed");
        return true;
    }

    private void onJoinPrepare() {
        maybeAutoCommitOffsetsSync(this.time.milliseconds());
        ConsumerRebalanceListener rebalanceListener = this.subscriptions.rebalanceListener();
        this.log.debug("Revoking previously assigned partitions {}", this.subscriptions.assignedPartitions());
        try {
            rebalanceListener.onPartitionsRevoked(new HashSet(this.subscriptions.assignedPartitions()));
        } catch (InterruptException e) {
            throw e;
        } catch (Exception e2) {
            this.log.error("User provided listener {} failed on partition revocation", rebalanceListener.getClass().getName(), e2);
        }
        this.subscriptions.resetGroupSubscription();
    }

    private void sendJoinGroupRequest(Node node) {
        this.log.debug("Sending JoinGroup");
        SessionData sessionData = this.sessionData;
        if (sessionData == null || sessionData.isInvalid()) {
            String next = this.subscriptionSnapshot.iterator().next();
            sessionData = new SessionData(-1L, -1, node.user(), next, -1, null, -1, null, -1, -1, -1L);
            sessionData.addAssignedPartitions(new PartitionData(next, -1, -1, null, -1, -1, false));
        }
        long milliseconds = this.time.milliseconds();
        ClientRequest newClientRequest = this.client.newClientRequest(node, new JoinGroupRequest.Builder(sessionData), milliseconds, true);
        this.log.debug("Sending JoinGroup Request");
        ClientResponse send = this.client.send(newClientRequest, milliseconds);
        this.log.debug("Got JoinGroup Response, Handling Join Group Response");
        handleJoinGroupResponse(send);
        this.log.debug("Handled JoinGroup Response");
    }

    private void handleJoinGroupResponse(ClientResponse clientResponse) {
        JoinGroupResponse joinGroupResponse = (JoinGroupResponse) clientResponse.responseBody();
        if (!clientResponse.wasDisconnected()) {
            if (joinGroupResponse.leader() == 1) {
                this.log.debug("Invoking onJoinLeader ");
                onJoinLeader(this.metadata.getNodeById(Integer.parseInt(clientResponse.destination())), joinGroupResponse);
                return;
            } else {
                this.log.debug("Invoking onJoinFollower ");
                onJoinFollower(this.metadata.getNodeById(Integer.parseInt(clientResponse.destination())), joinGroupResponse);
                return;
            }
        }
        this.log.info("Join Group failed as connection to database was severed.");
        this.client.disconnected(this.metadata.getNodeById(Integer.parseInt(clientResponse.destination())), this.time.milliseconds());
        this.rejoin = true;
        this.currentSession = null;
        if (this.sessionData != null) {
            this.log.info("Invalidating database session " + this.sessionData.name + ". New one will get created.");
            this.sessionData.invalidSessionData();
        }
    }

    private void onJoinFollower(Node node, JoinGroupResponse joinGroupResponse) {
        ArrayList arrayList = new ArrayList();
        String next = this.subscriptionSnapshot.iterator().next();
        SessionData sessionData = new SessionData(-1L, -1, node.user(), next, -1, null, -1, null, -1, -1, -1L);
        sessionData.addAssignedPartitions(new PartitionData(next, -1, -1, null, -1, -1, false));
        arrayList.add(sessionData);
        sendSyncGroupRequest(node, arrayList, joinGroupResponse.version());
    }

    private void onJoinLeader(Node node, JoinGroupResponse joinGroupResponse) {
        Map<String, SessionData> sessionData = joinGroupResponse.getSessionData();
        List<PartitionData> partitions = joinGroupResponse.partitions();
        ConsumerPartitionAssignor lookUpAssignor = lookUpAssignor();
        if (lookUpAssignor == null) {
            throw new IllegalStateException("Coordinator selected invalid assignment protocol.");
        }
        HashSet hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        String str = null;
        for (Map.Entry<String, SessionData> entry : sessionData.entrySet()) {
            String key = entry.getKey();
            if (str == null || !str.equals(key)) {
                ArrayList arrayList = new ArrayList();
                arrayList.add(entry.getValue().getSubscribedTopics());
                hashMap.put(key, new ConsumerPartitionAssignor.Subscription(arrayList, (ByteBuffer) null));
                hashSet.addAll(arrayList);
            }
            str = key;
        }
        this.subscriptions.groupSubscribe(hashSet);
        this.metadata.setTopics(this.subscriptions.metadataTopics());
        Map<String, ConsumerPartitionAssignor.Assignment> groupAssignment = lookUpAssignor.assign(this.metadata.fetch(), new ConsumerPartitionAssignor.GroupSubscription(hashMap)).groupAssignment();
        this.log.debug("Invoking geAssignment");
        sendSyncGroupRequest(node, getAssignment(groupAssignment, sessionData, partitions, joinGroupResponse.version()), joinGroupResponse.version());
    }

    private void sendSyncGroupRequest(Node node, List<SessionData> list, int i) {
        long milliseconds = this.time.milliseconds();
        handleSyncGroupResponse(this.client.send(this.client.newClientRequest(node, new SyncGroupRequest.Builder(list, i), milliseconds, true), milliseconds));
    }

    private void handleSyncGroupResponse(ClientResponse clientResponse) {
        SyncGroupResponse syncGroupResponse = (SyncGroupResponse) clientResponse.responseBody();
        Exception exception = syncGroupResponse.getException();
        if (!clientResponse.wasDisconnected()) {
            if (exception == null) {
                onJoinComplete(syncGroupResponse.getSessionData());
                this.rejoin = false;
                this.needsJoinPrepare = true;
                this.sessionData = syncGroupResponse.getSessionData();
                return;
            }
            return;
        }
        this.log.info("Sync Group failed as connection to database was severed.");
        this.client.disconnected(this.metadata.getNodeById(Integer.parseInt(clientResponse.destination())), this.time.milliseconds());
        this.rejoin = true;
        this.currentSession = null;
        if (this.sessionData != null) {
            this.log.info("Invalidating database session " + this.sessionData.name + ". New one will get created.");
            this.sessionData.invalidSessionData();
        }
    }

    protected void onJoinComplete(SessionData sessionData) {
        this.log.debug("OnJoinComplete Invoked");
        ArrayList arrayList = new ArrayList();
        for (PartitionData partitionData : sessionData.getAssignedPartitions()) {
            this.log.debug("Assigned PartitionData " + partitionData.toString());
            arrayList.add(partitionData.getTopicPartition());
        }
        this.subscriptions.assignFromSubscribed(arrayList);
        arrayList.stream().forEach(topicPartition -> {
            this.subscriptions.seek(topicPartition, 0L);
            this.subscriptions.completeValidation(topicPartition);
        });
        lookUpAssignor().onAssignment(new ConsumerPartitionAssignor.Assignment(arrayList, (ByteBuffer) null), new ConsumerGroupMetadata(sessionData.getSubscriberName(), sessionData.getVersion(), sessionData.name, Optional.of(sessionData.name)));
        this.nextAutoCommitDeadline = this.time.milliseconds() + this.autoCommitIntervalMs;
        ConsumerRebalanceListener rebalanceListener = this.subscriptions.rebalanceListener();
        this.log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions());
        try {
            rebalanceListener.onPartitionsAssigned(new HashSet(this.subscriptions.assignedPartitions()));
        } catch (Exception e) {
            this.log.error("User provided listener {} failed on partition assignment", rebalanceListener.getClass().getName(), e);
        } catch (InterruptException e2) {
            throw e2;
        }
    }

    private List<SessionData> getAssignment(Map<String, ConsumerPartitionAssignor.Assignment> map, Map<String, SessionData> map2, List<PartitionData> list, int i) {
        this.log.debug("Getting new assignment");
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Iterator<SessionData> it = map2.values().iterator();
        while (it.hasNext()) {
            for (PartitionData partitionData : it.next().getAssignedPartitions()) {
                hashMap.put(partitionData.getTopicPartition(), partitionData);
            }
        }
        for (PartitionData partitionData2 : list) {
            hashMap2.put(partitionData2.getTopicPartition(), partitionData2);
        }
        for (Map.Entry<String, ConsumerPartitionAssignor.Assignment> entry : map.entrySet()) {
            SessionData sessionData = map2.get(entry.getKey());
            SessionData sessionData2 = new SessionData(sessionData.getSessionId(), sessionData.getInstanceId(), sessionData.getSchema(), sessionData.getSubscribedTopics(), sessionData.getQueueId(), sessionData.getSubscriberName(), sessionData.getSubscriberId(), sessionData.createTime, sessionData.getLeader(), i, sessionData.getAuditId());
            for (TopicPartition topicPartition : entry.getValue().partitions()) {
                if (hashMap.get(topicPartition) == null) {
                    sessionData2.addAssignedPartitions((PartitionData) hashMap2.get(topicPartition));
                } else {
                    sessionData2.addAssignedPartitions((PartitionData) hashMap.get(topicPartition));
                }
            }
            arrayList.add(sessionData2);
        }
        return arrayList;
    }

    private ConsumerPartitionAssignor lookUpAssignor() {
        if (this.assignors.size() == 0) {
            return null;
        }
        return this.assignors.get(0);
    }

    public boolean mayBeTriggerSubcription(long j) {
        if (this.subscriptions.subscription().equals(this.subscriptionSnapshot)) {
            return true;
        }
        boolean z = false;
        this.rejoin = true;
        String subscribableTopics = getSubscribableTopics();
        long milliseconds = this.time.milliseconds();
        Node leastLoadedNode = this.client.leastLoadedNode(milliseconds);
        if (leastLoadedNode == null || !this.client.ready(leastLoadedNode, milliseconds)) {
            this.log.error("Failed to subscribe to topic: {}", subscribableTopics);
            return false;
        }
        try {
            if (this.aqConsumer.getSubcriberCount(leastLoadedNode, subscribableTopics) < 1) {
                z = true;
            }
            if (!handleSubscribeResponse(this.client.send(this.client.newClientRequest(leastLoadedNode, new SubscribeRequest.Builder(subscribableTopics), milliseconds, true, ((long) this.requestTimeoutMs) < j ? this.requestTimeoutMs : (int) j, null), milliseconds))) {
                return false;
            }
            if (z && this.aqConsumer.getoffsetStartegy() == "earliest") {
                TopicPartition topicPartition = new TopicPartition(subscribableTopics, -1);
                HashMap hashMap = new HashMap();
                hashMap.put(topicPartition, -2L);
                return resetOffsetsSync(hashMap, j);
            }
            if (z && this.aqConsumer.getoffsetStartegy() == "none") {
                throw new ConfigException("No previous offset found for the consumer group");
            }
            return true;
        } catch (ConfigException e) {
            this.log.error("Exception while subscribing to the topic" + e.getMessage(), e);
            this.log.info("Closing the consumer due to exception : " + e.getMessage());
            throw new ConfigException("No previous offset found for the consumer group");
        } catch (Exception e2) {
            this.log.error("Exception while subscribing to the topic" + e2.getMessage(), e2);
            return true;
        }
    }

    public void maybeUpdateMetadata(long j) {
        Cluster fetch = this.metadata.fetch();
        long milliseconds = this.time.milliseconds();
        if (fetch.isBootstrapConfigured() || this.metadata.timeToNextUpdate(milliseconds) == 0) {
            int version = this.metadata.version();
            long j2 = 0;
            while (true) {
                long j3 = j2;
                if (j3 > j || this.metadata.version() > version || this.metadata.isClosed()) {
                    break;
                }
                long milliseconds2 = this.time.milliseconds();
                this.client.maybeUpdateMetadata(milliseconds);
                milliseconds = this.time.milliseconds();
                j2 = j3 + (milliseconds2 - milliseconds);
            }
            this.log.debug("Metadata updated:Current Metadata Version " + this.metadata.version());
        }
    }

    private boolean handleSubscribeResponse(ClientResponse clientResponse) {
        if (clientResponse.wasDisconnected()) {
            this.client.disconnected(this.metadata.getNodeById(Integer.parseInt(clientResponse.destination())), this.time.milliseconds());
        }
        SubscribeResponse subscribeResponse = (SubscribeResponse) clientResponse.responseBody();
        if (subscribeResponse.getException() != null) {
            this.log.error("failed to subscribe to topic {}", subscribeResponse.getTopic());
            return false;
        }
        this.subscriptionSnapshot.add(subscribeResponse.getTopic());
        return true;
    }

    private String getSubscribableTopics() {
        return getSubscribedTopic();
    }

    private String getSubscribedTopic() {
        HashSet hashSet = new HashSet();
        for (String str : this.subscriptions.subscription()) {
            if (!this.subscriptionSnapshot.contains(str)) {
                hashSet.add(str);
                this.subscriptionSnapshot.clear();
            }
        }
        return (String) hashSet.iterator().next();
    }

    public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> map, long j) throws Exception {
        try {
            this.log.debug("Sending synchronous commit of offsets: {} request", map);
            Map<Node, List<TopicPartition>> commitableNodes = getCommitableNodes(map);
            if (commitableNodes == null || commitableNodes.size() == 0) {
                this.log.debug("No offsets to commit. Return");
                return true;
            }
            ClientResponse send = this.client.send(this.client.newClientRequest(this.metadata.getLeader(), new CommitRequest.Builder(commitableNodes, map), this.time.milliseconds(), true), this.time.milliseconds());
            handleCommitResponse(send);
            if (((CommitResponse) send.responseBody()).error()) {
                throw ((CommitResponse) send.responseBody()).getResult().entrySet().iterator().next().getValue();
            }
            return true;
        } catch (Exception e) {
            this.log.error("Exception while committing messages " + e, e);
            throw e;
        }
    }

    private void handleCommitResponse(ClientResponse clientResponse) {
        CommitResponse commitResponse = (CommitResponse) clientResponse.responseBody();
        Map<Node, List<TopicPartition>> nodes = commitResponse.getNodes();
        Map<TopicPartition, OffsetAndMetadata> offsets = commitResponse.offsets();
        for (Map.Entry<Node, Exception> entry : commitResponse.getResult().entrySet()) {
            if (entry.getValue() == null) {
                for (TopicPartition topicPartition : nodes.get(entry.getKey())) {
                    this.log.debug("Commited to topic partiton: {} with  offset: {} ", topicPartition, offsets.get(topicPartition));
                    offsets.remove(topicPartition);
                }
                nodes.remove(entry.getKey());
            } else {
                for (TopicPartition topicPartition2 : nodes.get(entry.getKey())) {
                    this.log.error("Failed to commit to topic partiton: {} with  offset: {} ", topicPartition2, offsets.get(topicPartition2));
                }
            }
        }
    }

    private Map<Node, List<TopicPartition>> getCommitableNodes(Map<TopicPartition, OffsetAndMetadata> map) {
        HashMap hashMap = new HashMap();
        Cluster fetch = this.metadata.fetch();
        Node leader = this.metadata.getLeader();
        if (leader == null) {
            Iterator it = fetch.nodes().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                org.apache.kafka.common.Node node = (org.apache.kafka.common.Node) it.next();
                if (this.client.isReady((Node) node, 0L)) {
                    leader = (Node) node;
                    this.log.info("Leader Node not present. Picked first ready node: " + leader);
                    break;
                }
            }
        }
        this.log.debug("Sending Commit request to leader Node " + leader);
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
            if (this.client.ready(leader, this.time.milliseconds())) {
                List list = (List) hashMap.get(leader);
                if (list == null) {
                    list = new ArrayList();
                    hashMap.put(leader, list);
                }
                list.add(entry.getKey());
            } else {
                this.log.info("Failed to send commit as Leader node is not ready to send commit: " + leader);
                this.log.error("Failed to commit to topic partiton: {} with  offset: {} ", entry.getKey(), entry.getValue());
            }
        }
        return hashMap;
    }

    public boolean resetOffsetsSync(Map<TopicPartition, Long> map, long j) {
        long milliseconds = this.time.milliseconds();
        Node leastLoadedNode = this.client.leastLoadedNode(milliseconds);
        if (leastLoadedNode == null || !this.client.ready(leastLoadedNode, milliseconds)) {
            return false;
        }
        return handleOffsetResetResponse(this.client.send(this.client.newClientRequest(leastLoadedNode, new OffsetResetRequest.Builder(map, 0L), milliseconds, true, ((long) this.requestTimeoutMs) < j ? this.requestTimeoutMs : (int) j, null), milliseconds), map);
    }

    public boolean handleOffsetResetResponse(ClientResponse clientResponse, Map<TopicPartition, Long> map) {
        Map<TopicPartition, Exception> offsetResetResponse = ((OffsetResetResponse) clientResponse.responseBody()).offsetResetResponse();
        HashSet hashSet = new HashSet();
        for (Map.Entry<TopicPartition, Exception> entry : offsetResetResponse.entrySet()) {
            Long l = map.get(entry.getKey());
            String l2 = l.longValue() == -2 ? "TO_EARLIEST" : l.longValue() == -1 ? "TO_LATEST" : Long.toString(l.longValue());
            if (entry.getValue() == null && entry.getKey().partition() != -1) {
                this.subscriptions.requestOffsetReset(entry.getKey(), (OffsetResetStrategy) null);
                this.subscriptions.seekValidated(entry.getKey(), new SubscriptionState.FetchPosition(0L));
                this.subscriptions.completeValidation(entry.getKey());
                this.log.trace("seek to offset {} for topicpartition  {} is successful", l2, entry.getKey());
            } else if (entry.getValue() != null) {
                if ((entry.getValue() instanceof SQLException) && ((SQLException) entry.getValue()).getErrorCode() == 25323) {
                    this.subscriptions.requestOffsetReset(entry.getKey(), (OffsetResetStrategy) null);
                } else {
                    hashSet.add(entry.getKey());
                }
                this.log.warn("Failed to update seek for topicpartition {} to offset {}", entry.getKey(), l2);
            }
        }
        this.subscriptions.requestFailed(hashSet, this.time.milliseconds() + this.retryBackoffMs);
        return true;
    }

    public void maybeAutoCommitOffsetsSync(long j) {
        if (!this.autoCommitEnabled || j < this.nextAutoCommitDeadline) {
            return;
        }
        this.nextAutoCommitDeadline = j + this.autoCommitIntervalMs;
        doCommitOffsetsSync();
    }

    public void clearSubscription() {
        this.subscriptionSnapshot.clear();
    }

    private void doCommitOffsetsSync() {
        try {
            commitOffsetsSync(this.subscriptions.allConsumed(), 0L);
        } catch (Exception e) {
        } finally {
            this.nextAutoCommitDeadline = this.time.milliseconds() + this.autoCommitIntervalMs;
        }
    }

    public void unsubscribe() {
        handleUnsubscribeResponse(this.client.send(this.client.newClientRequest(this.currentSession, new UnsubscribeRequest.Builder(), this.time.milliseconds(), true), this.time.milliseconds()));
    }

    private void handleUnsubscribeResponse(ClientResponse clientResponse) {
        if (!clientResponse.wasDisconnected()) {
            for (Map.Entry<String, Exception> entry : ((UnsubscribeResponse) clientResponse.responseBody()).response().entrySet()) {
                if (entry.getValue() != null) {
                    this.log.info("Failed to unsubscribe from topic: with exception: ", entry.getKey(), entry.getValue());
                } else {
                    this.log.info("Unsubscribed from topic: ", entry.getKey());
                }
            }
            return;
        }
        this.log.debug("handleUnsubscribeResponse : node in disconnected state\n");
        this.client.disconnected(this.metadata.getNodeById(Integer.parseInt(clientResponse.destination())), this.time.milliseconds());
        this.rejoin = true;
        this.currentSession = null;
        if (this.sessionData != null) {
            this.log.debug("handleUnsubscribeResponse : Invalidating database session " + this.sessionData.name + ". New one will get created.\n");
            this.sessionData.invalidSessionData();
        }
    }

    public long timeToNextPoll(long j, long j2) {
        if (!this.autoCommitEnabled) {
            return j2;
        }
        if (j > this.nextAutoCommitDeadline) {
            return 0L;
        }
        return Math.min(this.nextAutoCommitDeadline - j, j2);
    }

    public void close(long j) throws Exception {
        KafkaException kafkaException = null;
        if (this.autoCommitEnabled) {
            try {
                commitOffsetsSync(this.subscriptions.allConsumed(), j);
            } catch (Exception e) {
                kafkaException = new KafkaException("failed to commit consumed messages", e);
            }
        }
        this.client.close();
        if (kafkaException != null) {
            throw kafkaException;
        }
    }

    private Node getPreferredNode(Node node, String str, String str2, String str3) {
        long milliseconds = this.time.milliseconds();
        ClientRequest newClientRequest = this.client.newClientRequest(node, new ConnectMeRequest.Builder(str, str2, str3), milliseconds, true);
        this.log.debug("Sending ConnectMe Request");
        ClientResponse send = this.client.send(newClientRequest, milliseconds);
        this.log.debug("Got ConnectMe response");
        Node preferredNode = ((ConnectMeResponse) send.responseBody()).getPreferredNode();
        this.log.debug("ConnectMe: PreferredNode " + preferredNode);
        return preferredNode;
    }
}
