package org.oracle.okafka.clients.consumer.internals;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import oracle.jms.AQjmsBytesMessage;
import org.oracle.okafka.clients.ClientRequest;
import org.oracle.okafka.clients.ClientResponse;
import org.oracle.okafka.clients.KafkaClient;
import org.oracle.okafka.clients.Metadata;
import org.oracle.okafka.clients.RequestCompletionHandler;
import org.oracle.okafka.clients.consumer.OffsetAndMetadata;
import org.oracle.okafka.common.Cluster;
import org.oracle.okafka.common.KafkaException;
import org.oracle.okafka.common.Node;
import org.oracle.okafka.common.TopicPartition;
import org.oracle.okafka.common.requests.CommitRequest;
import org.oracle.okafka.common.requests.CommitResponse;
import org.oracle.okafka.common.requests.FetchRequest;
import org.oracle.okafka.common.requests.FetchResponse;
import org.oracle.okafka.common.requests.OffsetResetRequest;
import org.oracle.okafka.common.requests.OffsetResetResponse;
import org.oracle.okafka.common.requests.SubscribeRequest;
import org.oracle.okafka.common.requests.SubscribeResponse;
import org.oracle.okafka.common.requests.UnsubscribeRequest;
import org.oracle.okafka.common.requests.UnsubscribeResponse;
import org.oracle.okafka.common.utils.LogContext;
import org.oracle.okafka.common.utils.Time;
import org.slf4j.Logger;

/* loaded from: input_file:org/oracle/okafka/clients/consumer/internals/ConsumerNetworkClient.class */
public class ConsumerNetworkClient {
    private static final int MAX_POLL_TIMEOUT_MS = 5000;
    private final Logger log;
    private final KafkaClient client;
    private final Metadata metadata;
    private final Time time;
    private final boolean autoCommitEnabled;
    private final int autoCommitIntervalMs;
    private long nextAutoCommitDeadline;
    private final long retryBackoffMs;
    private final int maxPollTimeoutMs;
    private final int requestTimeoutMs;
    private final int sesssionTimeoutMs;
    private final long defaultApiTimeoutMs;
    private final SubscriptionState subscriptions;
    private final List<AQjmsBytesMessage> messages = new ArrayList();
    private Set<String> subcriptionSnapshot = new HashSet();

    public ConsumerNetworkClient(LogContext logContext, KafkaClient kafkaClient, Metadata metadata, SubscriptionState subscriptionState, boolean z, int i, Time time, long j, int i2, int i3, int i4, long j2) {
        this.log = logContext.logger(ConsumerNetworkClient.class);
        this.client = kafkaClient;
        this.metadata = metadata;
        this.subscriptions = subscriptionState;
        this.autoCommitEnabled = z;
        this.autoCommitIntervalMs = i;
        this.time = time;
        this.retryBackoffMs = j;
        this.maxPollTimeoutMs = Math.min(i3, MAX_POLL_TIMEOUT_MS);
        this.requestTimeoutMs = i2;
        this.sesssionTimeoutMs = i4;
        this.defaultApiTimeoutMs = j2;
        if (z) {
            this.nextAutoCommitDeadline = time.milliseconds() + i;
        }
    }

    public List<AQjmsBytesMessage> poll(long j) {
        this.messages.clear();
        Map<Node, String> pollableMap = getPollableMap();
        long milliseconds = this.time.milliseconds();
        RequestCompletionHandler requestCompletionHandler = new RequestCompletionHandler() { // from class: org.oracle.okafka.clients.consumer.internals.ConsumerNetworkClient.1
            @Override // org.oracle.okafka.clients.RequestCompletionHandler
            public void onComplete(ClientResponse clientResponse) {
            }
        };
        Iterator<Map.Entry<Node, String>> it = pollableMap.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<Node, String> next = it.next();
            Node key = next.getKey();
            if (this.client.ready(key, milliseconds)) {
                handleResponse(this.client.send(createFetchRequest(key, next.getValue(), requestCompletionHandler, ((long) this.requestTimeoutMs) < j ? this.requestTimeoutMs : (int) j), milliseconds));
            } else {
                this.log.info("Failed to consume messages from node: {}", key);
            }
        }
        return this.messages;
    }

    private Map<Node, String> getPollableMap() {
        try {
            return Collections.singletonMap(this.metadata.fetch().leader(), this.subcriptionSnapshot.iterator().next());
        } catch (NoSuchElementException e) {
            return Collections.emptyMap();
        }
    }

    private ClientRequest createFetchRequest(Node node, String str, RequestCompletionHandler requestCompletionHandler, int i) {
        return this.client.newClientRequest(node, new FetchRequest.Builder(str, i), this.time.milliseconds(), true, i, requestCompletionHandler);
    }

    private void handleResponse(ClientResponse clientResponse) {
        if (clientResponse.wasDisconnected()) {
            this.client.disconnected(clientResponse.destination(), this.time.milliseconds());
        }
        this.messages.addAll(((FetchResponse) clientResponse.responseBody()).getMessages());
    }

    public boolean mayBeTriggerSubcription(long j) {
        if (this.subscriptions.subscription().equals(this.subcriptionSnapshot)) {
            return true;
        }
        String subscribableTopics = getSubscribableTopics();
        long milliseconds = this.time.milliseconds();
        Node leastLoadedNode = this.client.leastLoadedNode(milliseconds);
        if (leastLoadedNode != null && this.client.ready(leastLoadedNode, milliseconds)) {
            return handleSubscribeResponse(this.client.send(this.client.newClientRequest(leastLoadedNode, new SubscribeRequest.Builder(subscribableTopics), milliseconds, true, ((long) this.requestTimeoutMs) < j ? this.requestTimeoutMs : (int) j, null), milliseconds));
        }
        this.log.error("Failed to subscribe to topic: {}", subscribableTopics);
        return false;
    }

    private boolean handleSubscribeResponse(ClientResponse clientResponse) {
        if (clientResponse.wasDisconnected()) {
            this.client.disconnected(clientResponse.destination(), this.time.milliseconds());
            this.metadata.requestUpdate();
        }
        SubscribeResponse subscribeResponse = (SubscribeResponse) clientResponse.responseBody();
        if (subscribeResponse.getException() != null) {
            this.log.error("failed to subscribe to topic {}", subscribeResponse.getTopic());
            return false;
        }
        this.subcriptionSnapshot.add(subscribeResponse.getTopic());
        return true;
    }

    private String getSubscribableTopics() {
        return getSubscribedTopic();
    }

    private String getSubscribedTopic() {
        HashSet hashSet = new HashSet();
        for (String str : this.subscriptions.subscription()) {
            if (!this.subcriptionSnapshot.contains(str)) {
                hashSet.add(str);
                this.subcriptionSnapshot.clear();
            }
        }
        return (String) hashSet.iterator().next();
    }

    public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> map, long j) throws Exception {
        this.log.debug("Sending synchronous commit of offsets: {} request", map);
        ClientResponse send = this.client.send(this.client.newClientRequest(null, new CommitRequest.Builder(getCommitableNodes(map), map), this.time.milliseconds(), true), this.time.milliseconds());
        handleCommitResponse(send);
        if (((CommitResponse) send.responseBody()).error()) {
            throw ((CommitResponse) send.responseBody()).getResult().entrySet().iterator().next().getValue();
        }
        return true;
    }

    private void handleCommitResponse(ClientResponse clientResponse) {
        CommitResponse commitResponse = (CommitResponse) clientResponse.responseBody();
        Map<Node, List<TopicPartition>> nodes = commitResponse.getNodes();
        Map<TopicPartition, OffsetAndMetadata> offsets = commitResponse.offsets();
        for (Map.Entry<Node, Exception> entry : commitResponse.getResult().entrySet()) {
            if (entry.getValue() == null) {
                for (TopicPartition topicPartition : nodes.get(entry.getKey())) {
                    this.log.debug("Commited to topic partiton: {} with  offset: {} ", topicPartition, offsets.get(topicPartition));
                    offsets.remove(topicPartition);
                }
                nodes.remove(entry.getKey());
            } else {
                for (TopicPartition topicPartition2 : nodes.get(entry.getKey())) {
                    this.log.error("Failed to commit to topic partiton: {} with  offset: {} ", topicPartition2, offsets.get(topicPartition2));
                }
            }
        }
    }

    private Map<Node, List<TopicPartition>> getCommitableNodes(Map<TopicPartition, OffsetAndMetadata> map) {
        HashMap hashMap = new HashMap();
        Cluster fetch = this.metadata.fetch();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
            if (this.client.ready(fetch.leader(), this.time.milliseconds())) {
                if (hashMap.get(fetch.leader()) == null) {
                    hashMap.put(fetch.leader(), new ArrayList());
                }
                ((List) hashMap.get(fetch.leader())).add(entry.getKey());
            } else {
                this.log.error("Failed to commit to topic partiton: {} with  offset: {} ", entry.getKey(), entry.getValue());
            }
        }
        return hashMap;
    }

    public boolean resetOffsetsSync(Map<TopicPartition, Long> map, long j) {
        long milliseconds = this.time.milliseconds();
        Node leastLoadedNode = this.client.leastLoadedNode(milliseconds);
        if (leastLoadedNode == null || !this.client.ready(leastLoadedNode, milliseconds)) {
            return false;
        }
        return handleOffsetResetResponse(this.client.send(this.client.newClientRequest(leastLoadedNode, new OffsetResetRequest.Builder(map, 0L), milliseconds, true, ((long) this.requestTimeoutMs) < j ? this.requestTimeoutMs : (int) j, null), milliseconds), map);
    }

    public boolean handleOffsetResetResponse(ClientResponse clientResponse, Map<TopicPartition, Long> map) {
        Map<TopicPartition, Exception> offsetResetResponse = ((OffsetResetResponse) clientResponse.responseBody()).offsetResetResponse();
        HashSet hashSet = new HashSet();
        for (Map.Entry<TopicPartition, Exception> entry : offsetResetResponse.entrySet()) {
            Long l = map.get(entry.getKey());
            String l2 = l.longValue() == -2 ? "TO_EARLIEST" : l.longValue() == -1 ? "TO_LATEST" : Long.toString(l.longValue());
            if (entry.getValue() == null) {
                this.subscriptions.requestOffsetReset(entry.getKey(), null);
                this.log.trace("seek to offset {} for topicpartition  {} is successful", l2, entry.getKey());
            } else {
                if ((entry.getValue() instanceof SQLException) && ((SQLException) entry.getValue()).getErrorCode() == 25323) {
                    this.subscriptions.requestOffsetReset(entry.getKey(), null);
                } else {
                    hashSet.add(entry.getKey());
                }
                this.log.warn("Failed to update seek for topicpartition {} to offset {}", entry.getKey(), l2);
            }
        }
        this.subscriptions.resetFailed(hashSet, this.time.milliseconds() + this.retryBackoffMs);
        return true;
    }

    public void maybeAutoCommitOffsetsSync(long j) {
        if (!this.autoCommitEnabled || j < this.nextAutoCommitDeadline) {
            return;
        }
        this.nextAutoCommitDeadline = j + this.autoCommitIntervalMs;
        doCommitOffsetsSync();
    }

    public void clearSubscription() {
        this.subcriptionSnapshot.clear();
    }

    private void doCommitOffsetsSync() {
        try {
            commitOffsetsSync(this.subscriptions.allConsumed(), 0L);
        } catch (Exception e) {
        } finally {
            this.nextAutoCommitDeadline = this.time.milliseconds() + this.autoCommitIntervalMs;
        }
    }

    public void unsubscribe() {
        handleUnsubscribeResponse(this.client.send(this.client.newClientRequest(null, new UnsubscribeRequest.Builder(), this.time.milliseconds(), true), this.time.milliseconds()));
    }

    private void handleUnsubscribeResponse(ClientResponse clientResponse) {
        for (Map.Entry<String, Exception> entry : ((UnsubscribeResponse) clientResponse.responseBody()).response().entrySet()) {
            if (entry.getValue() == null) {
                this.log.trace("Failed to unsubscribe from topic: with exception: ", entry.getKey(), entry.getValue());
            } else {
                this.log.trace("Unsubscribed from topic: ", entry.getKey());
            }
        }
    }

    public long timeToNextPoll(long j, long j2) {
        if (!this.autoCommitEnabled) {
            return j2;
        }
        if (j > this.nextAutoCommitDeadline) {
            return 0L;
        }
        return Math.min(this.nextAutoCommitDeadline - j, j2);
    }

    public void close(long j) throws Exception {
        KafkaException kafkaException = null;
        if (this.autoCommitEnabled) {
            this.subscriptions.allConsumed();
            try {
                commitOffsetsSync(this.subscriptions.allConsumed(), j);
            } catch (Exception e) {
                kafkaException = new KafkaException("failed to commit consumed messages", e);
            }
        }
        this.client.close();
        if (kafkaException != null) {
            throw kafkaException;
        }
    }
}
