/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerNetworkClient
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(ConsumerNetworkClient.class);
    private static final long MAX_POLL_TIMEOUT_MS = 5000L;
    private final KafkaClient client;
    private final Map<Node, List<ClientRequest>> unsent = new HashMap<Node, List<ClientRequest>>();
    private final Metadata metadata;
    private final Time time;
    private final long retryBackoffMs;
    private final long unsentExpiryMs;
    private int wakeupDisabledCount = 0;
    private final ConcurrentLinkedQueue<RequestFutureCompletionHandler> pendingCompletion = new ConcurrentLinkedQueue();
    private final AtomicBoolean wakeup = new AtomicBoolean(false);

    public ConsumerNetworkClient(KafkaClient client, Metadata metadata, Time time, long retryBackoffMs, long requestTimeoutMs) {
        this.client = client;
        this.metadata = metadata;
        this.time = time;
        this.retryBackoffMs = retryBackoffMs;
        this.unsentExpiryMs = requestTimeoutMs;
    }

    public RequestFuture<ClientResponse> send(Node node, AbstractRequest.Builder<?> requestBuilder) {
        long now = this.time.milliseconds();
        RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
        ClientRequest clientRequest = this.client.newClientRequest(node.idString(), requestBuilder, now, true, completionHandler);
        this.put(node, clientRequest);
        this.client.wakeup();
        return completionHandler.future;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void put(Node node, ClientRequest request) {
        ConsumerNetworkClient consumerNetworkClient = this;
        synchronized (consumerNetworkClient) {
            List<ClientRequest> nodeUnsent = this.unsent.get(node);
            if (nodeUnsent == null) {
                nodeUnsent = new ArrayList<ClientRequest>();
                this.unsent.put(node, nodeUnsent);
            }
            nodeUnsent.add(request);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Node leastLoadedNode() {
        ConsumerNetworkClient consumerNetworkClient = this;
        synchronized (consumerNetworkClient) {
            return this.client.leastLoadedNode(this.time.milliseconds());
        }
    }

    public void awaitMetadataUpdate() {
        this.awaitMetadataUpdate(Long.MAX_VALUE);
    }

    public boolean awaitMetadataUpdate(long timeout) {
        long startMs = this.time.milliseconds();
        int version = this.metadata.requestUpdate();
        do {
            this.poll(timeout);
        } while (this.metadata.version() == version && this.time.milliseconds() - startMs < timeout);
        return this.metadata.version() > version;
    }

    public void ensureFreshMetadata() {
        if (this.metadata.updateRequested() || this.metadata.timeToNextUpdate(this.time.milliseconds()) == 0L) {
            this.awaitMetadataUpdate();
        }
    }

    public void wakeup() {
        log.trace("Received user wakeup");
        this.wakeup.set(true);
        this.client.wakeup();
    }

    public void poll(RequestFuture<?> future) {
        while (!future.isDone()) {
            this.poll(5000L, this.time.milliseconds(), future);
        }
    }

    public boolean poll(RequestFuture<?> future, long timeout) {
        long begin = this.time.milliseconds();
        long remaining = timeout;
        long now = begin;
        do {
            this.poll(remaining, now, future);
            now = this.time.milliseconds();
            long elapsed = now - begin;
            remaining = timeout - elapsed;
        } while (!future.isDone() && remaining > 0L);
        return future.isDone();
    }

    public void poll(long timeout) {
        this.poll(timeout, this.time.milliseconds(), null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void poll(long timeout, long now, PollCondition pollCondition) {
        this.firePendingCompletedRequests();
        ConsumerNetworkClient consumerNetworkClient = this;
        synchronized (consumerNetworkClient) {
            this.trySend(now);
            if (pollCondition == null || pollCondition.shouldBlock()) {
                if (this.client.inFlightRequestCount() == 0) {
                    timeout = Math.min(timeout, this.retryBackoffMs);
                }
                this.client.poll(Math.min(5000L, timeout), now);
                now = this.time.milliseconds();
            } else {
                this.client.poll(0L, now);
            }
            this.checkDisconnects(now);
            this.maybeTriggerWakeup();
            this.maybeThrowInterruptException();
            this.trySend(now);
            this.failExpiredRequests(now);
        }
        this.firePendingCompletedRequests();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pollNoWakeup() {
        this.disableWakeups();
        try {
            this.poll(0L, this.time.milliseconds(), null);
        }
        finally {
            this.enableWakeups();
        }
    }

    public boolean awaitPendingRequests(Node node, long timeoutMs) {
        long startMs = this.time.milliseconds();
        long remainingMs = timeoutMs;
        while (this.pendingRequestCount(node) > 0 && remainingMs > 0L) {
            this.poll(remainingMs);
            remainingMs = timeoutMs - (this.time.milliseconds() - startMs);
        }
        return this.pendingRequestCount(node) == 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int pendingRequestCount(Node node) {
        ConsumerNetworkClient consumerNetworkClient = this;
        synchronized (consumerNetworkClient) {
            List<ClientRequest> pending = this.unsent.get(node);
            int unsentCount = pending == null ? 0 : pending.size();
            return unsentCount + this.client.inFlightRequestCount(node.idString());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int pendingRequestCount() {
        ConsumerNetworkClient consumerNetworkClient = this;
        synchronized (consumerNetworkClient) {
            int total = 0;
            for (List<ClientRequest> requests : this.unsent.values()) {
                total += requests.size();
            }
            return total + this.client.inFlightRequestCount();
        }
    }

    private void firePendingCompletedRequests() {
        RequestFutureCompletionHandler completionHandler;
        boolean completedRequestsFired = false;
        while ((completionHandler = this.pendingCompletion.poll()) != null) {
            completionHandler.fireCompletion();
            completedRequestsFired = true;
        }
        if (completedRequestsFired) {
            this.client.wakeup();
        }
    }

    private void checkDisconnects(long now) {
        Iterator<Map.Entry<Node, List<ClientRequest>>> iterator2 = this.unsent.entrySet().iterator();
        while (iterator2.hasNext()) {
            Map.Entry<Node, List<ClientRequest>> requestEntry = iterator2.next();
            Node node = requestEntry.getKey();
            if (!this.client.connectionFailed(node)) continue;
            iterator2.remove();
            for (ClientRequest request : requestEntry.getValue()) {
                RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler)request.callback();
                handler.onComplete(new ClientResponse(request.makeHeader(), request.callback(), request.destination(), request.createdTimeMs(), now, true, null, null));
            }
        }
    }

    private void failExpiredRequests(long now) {
        Iterator<Map.Entry<Node, List<ClientRequest>>> iterator2 = this.unsent.entrySet().iterator();
        while (iterator2.hasNext()) {
            ClientRequest request;
            Map.Entry<Node, List<ClientRequest>> requestEntry = iterator2.next();
            Iterator<ClientRequest> requestIterator = requestEntry.getValue().iterator();
            while (requestIterator.hasNext() && (request = requestIterator.next()).createdTimeMs() < now - this.unsentExpiryMs) {
                RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler)request.callback();
                handler.onFailure(new TimeoutException("Failed to send request after " + this.unsentExpiryMs + " ms."));
                requestIterator.remove();
            }
            if (!requestEntry.getValue().isEmpty()) continue;
            iterator2.remove();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void failUnsentRequests(Node node, RuntimeException e) {
        ConsumerNetworkClient consumerNetworkClient = this;
        synchronized (consumerNetworkClient) {
            List<ClientRequest> unsentRequests = this.unsent.remove(node);
            if (unsentRequests != null) {
                for (ClientRequest unsentRequest : unsentRequests) {
                    RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler)unsentRequest.callback();
                    handler.onFailure(e);
                }
            }
        }
        this.firePendingCompletedRequests();
    }

    private boolean trySend(long now) {
        boolean requestsSent = false;
        for (Map.Entry<Node, List<ClientRequest>> requestEntry : this.unsent.entrySet()) {
            Node node = requestEntry.getKey();
            Iterator<ClientRequest> iterator2 = requestEntry.getValue().iterator();
            while (iterator2.hasNext()) {
                ClientRequest request = iterator2.next();
                if (!this.client.ready(node, now)) continue;
                this.client.send(request, now);
                iterator2.remove();
                requestsSent = true;
            }
        }
        return requestsSent;
    }

    private void maybeTriggerWakeup() {
        if (this.wakeupDisabledCount == 0 && this.wakeup.get()) {
            log.trace("Raising wakeup exception in response to user wakeup");
            this.wakeup.set(false);
            throw new WakeupException();
        }
    }

    private void maybeThrowInterruptException() {
        if (Thread.interrupted()) {
            throw new InterruptException(new InterruptedException());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void disableWakeups() {
        ConsumerNetworkClient consumerNetworkClient = this;
        synchronized (consumerNetworkClient) {
            ++this.wakeupDisabledCount;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void enableWakeups() {
        ConsumerNetworkClient consumerNetworkClient = this;
        synchronized (consumerNetworkClient) {
            if (this.wakeupDisabledCount <= 0) {
                throw new IllegalStateException("Cannot enable wakeups since they were never disabled");
            }
            --this.wakeupDisabledCount;
            if (this.wakeupDisabledCount == 0 && this.wakeup.get()) {
                this.client.wakeup();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        ConsumerNetworkClient consumerNetworkClient = this;
        synchronized (consumerNetworkClient) {
            this.client.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean connectionFailed(Node node) {
        ConsumerNetworkClient consumerNetworkClient = this;
        synchronized (consumerNetworkClient) {
            return this.client.connectionFailed(node);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void tryConnect(Node node) {
        ConsumerNetworkClient consumerNetworkClient = this;
        synchronized (consumerNetworkClient) {
            this.client.ready(node, this.time.milliseconds());
        }
    }

    public static interface PollCondition {
        public boolean shouldBlock();
    }

    public class RequestFutureCompletionHandler
    implements RequestCompletionHandler {
        private final RequestFuture<ClientResponse> future = new RequestFuture();
        private ClientResponse response;
        private RuntimeException e;

        public void fireCompletion() {
            if (this.e != null) {
                this.future.raise(this.e);
            } else if (this.response.wasDisconnected()) {
                RequestHeader requestHeader = this.response.requestHeader();
                ApiKeys api = ApiKeys.forId(requestHeader.apiKey());
                int correlation = requestHeader.correlationId();
                log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected", new Object[]{api, requestHeader, correlation, this.response.destination()});
                this.future.raise(DisconnectException.INSTANCE);
            } else if (this.response.versionMismatch() != null) {
                this.future.raise(this.response.versionMismatch());
            } else {
                this.future.complete(this.response);
            }
        }

        public void onFailure(RuntimeException e) {
            this.e = e;
            ConsumerNetworkClient.this.pendingCompletion.add(this);
        }

        @Override
        public void onComplete(ClientResponse response) {
            this.response = response;
            ConsumerNetworkClient.this.pendingCompletion.add(this);
        }
    }
}

