/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kafka.shaded.org.apache.kafka.clients;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.ApiVersions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.ClientRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.ClientResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.ClusterConnectionStates;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.InFlightRequests;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.KafkaClient;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.MetadataUpdater;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.NodeApiVersions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.Cluster;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.Node;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.AuthenticationException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.Sensor;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.network.ChannelState;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.network.NetworkReceive;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selectable;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Send;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.protocol.ApiKeys;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.protocol.CommonFields;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.protocol.Errors;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.protocol.types.Struct;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.AbstractRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.AbstractResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.MetadataRequest;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.MetadataResponse;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.RequestHeader;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.requests.ResponseHeader;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.LogContext;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.Time;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

public class NetworkClient
implements KafkaClient {
    private final Logger log;
    private final Selectable selector;
    private final MetadataUpdater metadataUpdater;
    private final Random randOffset;
    private final ClusterConnectionStates connectionStates;
    private final InFlightRequests inFlightRequests;
    private final int socketSendBuffer;
    private final int socketReceiveBuffer;
    private final String clientId;
    private int correlation;
    private final int defaultRequestTimeoutMs;
    private final long reconnectBackoffMs;
    private final Time time;
    private final boolean discoverBrokerVersions;
    private final ApiVersions apiVersions;
    private final Map<String, ApiVersionsRequest.Builder> nodesNeedingApiVersionsFetch = new HashMap<String, ApiVersionsRequest.Builder>();
    private final List<ClientResponse> abortedSends = new LinkedList<ClientResponse>();
    private final Sensor throttleTimeSensor;

    public NetworkClient(Selectable selector, Metadata metadata, String clientId, int maxInFlightRequestsPerConnection, long reconnectBackoffMs, long reconnectBackoffMax, int socketSendBuffer, int socketReceiveBuffer, int defaultRequestTimeoutMs, Time time, boolean discoverBrokerVersions, ApiVersions apiVersions, LogContext logContext) {
        this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs, reconnectBackoffMax, socketSendBuffer, socketReceiveBuffer, defaultRequestTimeoutMs, time, discoverBrokerVersions, apiVersions, null, logContext);
    }

    public NetworkClient(Selectable selector, Metadata metadata, String clientId, int maxInFlightRequestsPerConnection, long reconnectBackoffMs, long reconnectBackoffMax, int socketSendBuffer, int socketReceiveBuffer, int defaultRequestTimeoutMs, Time time, boolean discoverBrokerVersions, ApiVersions apiVersions, Sensor throttleTimeSensor, LogContext logContext) {
        this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs, reconnectBackoffMax, socketSendBuffer, socketReceiveBuffer, defaultRequestTimeoutMs, time, discoverBrokerVersions, apiVersions, throttleTimeSensor, logContext);
    }

    public NetworkClient(Selectable selector, MetadataUpdater metadataUpdater, String clientId, int maxInFlightRequestsPerConnection, long reconnectBackoffMs, long reconnectBackoffMax, int socketSendBuffer, int socketReceiveBuffer, int defaultRequestTimeoutMs, Time time, boolean discoverBrokerVersions, ApiVersions apiVersions, LogContext logContext) {
        this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs, reconnectBackoffMax, socketSendBuffer, socketReceiveBuffer, defaultRequestTimeoutMs, time, discoverBrokerVersions, apiVersions, null, logContext);
    }

    private NetworkClient(MetadataUpdater metadataUpdater, Metadata metadata, Selectable selector, String clientId, int maxInFlightRequestsPerConnection, long reconnectBackoffMs, long reconnectBackoffMax, int socketSendBuffer, int socketReceiveBuffer, int defaultRequestTimeoutMs, Time time, boolean discoverBrokerVersions, ApiVersions apiVersions, Sensor throttleTimeSensor, LogContext logContext) {
        if (metadataUpdater == null) {
            if (metadata == null) {
                throw new IllegalArgumentException("`metadata` must not be null");
            }
            this.metadataUpdater = new DefaultMetadataUpdater(metadata);
        } else {
            this.metadataUpdater = metadataUpdater;
        }
        this.selector = selector;
        this.clientId = clientId;
        this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
        this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax);
        this.socketSendBuffer = socketSendBuffer;
        this.socketReceiveBuffer = socketReceiveBuffer;
        this.correlation = 0;
        this.randOffset = new Random();
        this.defaultRequestTimeoutMs = defaultRequestTimeoutMs;
        this.reconnectBackoffMs = reconnectBackoffMs;
        this.time = time;
        this.discoverBrokerVersions = discoverBrokerVersions;
        this.apiVersions = apiVersions;
        this.throttleTimeSensor = throttleTimeSensor;
        this.log = logContext.logger(NetworkClient.class);
    }

    @Override
    public boolean ready(Node node, long now) {
        if (node.isEmpty()) {
            throw new IllegalArgumentException("Cannot connect to empty node " + node);
        }
        if (this.isReady(node, now)) {
            return true;
        }
        if (this.connectionStates.canConnect(node.idString(), now)) {
            this.initiateConnect(node, now);
        }
        return false;
    }

    boolean canConnect(Node node, long now) {
        return this.connectionStates.canConnect(node.idString(), now);
    }

    @Override
    public void disconnect(String nodeId) {
        if (this.connectionStates.isDisconnected(nodeId)) {
            return;
        }
        this.selector.close(nodeId);
        ArrayList<ApiKeys> requestTypes = new ArrayList<ApiKeys>();
        long now = this.time.milliseconds();
        for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
            if (request.isInternalRequest) {
                if (request.header.apiKey() != ApiKeys.METADATA) continue;
                this.metadataUpdater.handleDisconnection(request.destination);
                continue;
            }
            requestTypes.add(request.header.apiKey());
            this.abortedSends.add(new ClientResponse(request.header, request.callback, request.destination, request.createdTimeMs, now, true, null, null, null));
        }
        this.connectionStates.disconnected(nodeId, now);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Manually disconnected from {}. Removed requests: {}.", (Object)nodeId, (Object)Utils.join(requestTypes, ", "));
        }
    }

    @Override
    public void close(String nodeId) {
        this.selector.close(nodeId);
        for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
            if (!request.isInternalRequest || request.header.apiKey() != ApiKeys.METADATA) continue;
            this.metadataUpdater.handleDisconnection(request.destination);
        }
        this.connectionStates.remove(nodeId);
    }

    @Override
    public long connectionDelay(Node node, long now) {
        return this.connectionStates.connectionDelay(node.idString(), now);
    }

    public long throttleDelayMs(Node node, long now) {
        return this.connectionStates.throttleDelayMs(node.idString(), now);
    }

    @Override
    public long pollDelayMs(Node node, long now) {
        return this.connectionStates.pollDelayMs(node.idString(), now);
    }

    @Override
    public boolean connectionFailed(Node node) {
        return this.connectionStates.isDisconnected(node.idString());
    }

    @Override
    public AuthenticationException authenticationException(Node node) {
        return this.connectionStates.authenticationException(node.idString());
    }

    @Override
    public boolean isReady(Node node, long now) {
        return !this.metadataUpdater.isUpdateDue(now) && this.canSendRequest(node.idString(), now);
    }

    private boolean canSendRequest(String node, long now) {
        return this.connectionStates.isReady(node, now) && this.selector.isChannelReady(node) && this.inFlightRequests.canSendMore(node);
    }

    @Override
    public void send(ClientRequest request, long now) {
        this.doSend(request, false, now);
    }

    private void sendInternalMetadataRequest(MetadataRequest.Builder builder, String nodeConnectionId, long now) {
        ClientRequest clientRequest = this.newClientRequest(nodeConnectionId, builder, now, true);
        this.doSend(clientRequest, true, now);
    }

    private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
        String nodeId = clientRequest.destination();
        if (!isInternalRequest && !this.canSendRequest(nodeId, now)) {
            throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
        }
        AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
        try {
            short version;
            NodeApiVersions versionInfo = this.apiVersions.get(nodeId);
            if (versionInfo == null) {
                version = builder.latestAllowedVersion();
                if (this.discoverBrokerVersions && this.log.isTraceEnabled()) {
                    this.log.trace("No version information found when sending {} with correlation id {} to node {}. Assuming version {}.", new Object[]{clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version});
                }
            } else {
                version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(), builder.latestAllowedVersion());
            }
            this.doSend(clientRequest, isInternalRequest, now, (AbstractRequest)builder.build(version));
        }
        catch (UnsupportedVersionException unsupportedVersionException) {
            this.log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", new Object[]{builder, clientRequest.correlationId(), clientRequest.destination(), unsupportedVersionException});
            ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()), clientRequest.callback(), clientRequest.destination(), now, now, false, unsupportedVersionException, null, null);
            this.abortedSends.add(clientResponse);
        }
    }

    private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
        String destination = clientRequest.destination();
        RequestHeader header = clientRequest.makeHeader(request.version());
        if (this.log.isDebugEnabled()) {
            short latestClientVersion = clientRequest.apiKey().latestVersion();
            if (header.apiVersion() == latestClientVersion) {
                this.log.trace("Sending {} {} with correlation id {} to node {}", new Object[]{clientRequest.apiKey(), request, clientRequest.correlationId(), destination});
            } else {
                this.log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}", new Object[]{header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination});
            }
        }
        Send send = request.toSend(destination, header);
        InFlightRequest inFlightRequest = new InFlightRequest(clientRequest, header, isInternalRequest, request, send, now);
        this.inFlightRequests.add(inFlightRequest);
        this.selector.send(send);
    }

    @Override
    public List<ClientResponse> poll(long timeout, long now) {
        if (!this.abortedSends.isEmpty()) {
            ArrayList<ClientResponse> responses = new ArrayList<ClientResponse>();
            this.handleAbortedSends(responses);
            this.completeResponses(responses);
            return responses;
        }
        long metadataTimeout = this.metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, this.defaultRequestTimeoutMs));
        }
        catch (IOException e) {
            this.log.error("Unexpected error during I/O", (Throwable)e);
        }
        long updatedNow = this.time.milliseconds();
        ArrayList<ClientResponse> responses = new ArrayList<ClientResponse>();
        this.handleCompletedSends(responses, updatedNow);
        this.handleCompletedReceives(responses, updatedNow);
        this.handleDisconnections(responses, updatedNow);
        this.handleConnections();
        this.handleInitiateApiVersionRequests(updatedNow);
        this.handleTimedOutRequests(responses, updatedNow);
        this.completeResponses(responses);
        return responses;
    }

    private void completeResponses(List<ClientResponse> responses) {
        for (ClientResponse response : responses) {
            try {
                response.onComplete();
            }
            catch (Exception e) {
                this.log.error("Uncaught error in request completion:", (Throwable)e);
            }
        }
    }

    @Override
    public int inFlightRequestCount() {
        return this.inFlightRequests.count();
    }

    @Override
    public boolean hasInFlightRequests() {
        return !this.inFlightRequests.isEmpty();
    }

    @Override
    public int inFlightRequestCount(String node) {
        return this.inFlightRequests.count(node);
    }

    @Override
    public boolean hasInFlightRequests(String node) {
        return !this.inFlightRequests.isEmpty(node);
    }

    @Override
    public boolean hasReadyNodes(long now) {
        return this.connectionStates.hasReadyNodes(now);
    }

    @Override
    public void wakeup() {
        this.selector.wakeup();
    }

    @Override
    public void close() {
        this.selector.close();
        this.metadataUpdater.close();
    }

    @Override
    public Node leastLoadedNode(long now) {
        List<Node> nodes = this.metadataUpdater.fetchNodes();
        int inflight = Integer.MAX_VALUE;
        Node found = null;
        int offset = this.randOffset.nextInt(nodes.size());
        for (int i = 0; i < nodes.size(); ++i) {
            int idx = (offset + i) % nodes.size();
            Node node = nodes.get(idx);
            int currInflight = this.inFlightRequests.count(node.idString());
            if (currInflight == 0 && this.isReady(node, now)) {
                this.log.trace("Found least loaded node {} connected with no in-flight requests", (Object)node);
                return node;
            }
            if (!this.connectionStates.isBlackedOut(node.idString(), now) && currInflight < inflight) {
                inflight = currInflight;
                found = node;
                continue;
            }
            if (!this.log.isTraceEnabled()) continue;
            this.log.trace("Removing node {} from least loaded node selection: is-blacked-out: {}, in-flight-requests: {}", new Object[]{node, this.connectionStates.isBlackedOut(node.idString(), now), currInflight});
        }
        if (found != null) {
            this.log.trace("Found least loaded node {}", found);
        } else {
            this.log.trace("Least loaded node selection failed to find an available node");
        }
        return found;
    }

    public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) {
        Struct responseStruct = NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader, null, 0L);
        return AbstractResponse.parseResponse(requestHeader.apiKey(), responseStruct);
    }

    private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader, Sensor throttleTimeSensor, long now) {
        ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
        Struct responseBody = requestHeader.apiKey().parseResponse(requestHeader.apiVersion(), responseBuffer);
        NetworkClient.correlate(requestHeader, responseHeader);
        if (throttleTimeSensor != null && responseBody.hasField(CommonFields.THROTTLE_TIME_MS)) {
            throttleTimeSensor.record(responseBody.get(CommonFields.THROTTLE_TIME_MS).intValue(), now);
        }
        return responseBody;
    }

    private void processDisconnection(List<ClientResponse> responses, String nodeId, long now, ChannelState disconnectState) {
        this.connectionStates.disconnected(nodeId, now);
        this.apiVersions.remove(nodeId);
        this.nodesNeedingApiVersionsFetch.remove(nodeId);
        switch (disconnectState.state()) {
            case AUTHENTICATION_FAILED: {
                AuthenticationException exception = disconnectState.exception();
                this.connectionStates.authenticationFailed(nodeId, now, exception);
                this.metadataUpdater.handleAuthenticationFailure(exception);
                this.log.error("Connection to node {} failed authentication due to: {}", (Object)nodeId, (Object)exception.getMessage());
                break;
            }
            case AUTHENTICATE: {
                this.log.warn("Connection to node {} terminated during authentication. This may indicate that authentication failed due to invalid credentials.", (Object)nodeId);
                break;
            }
            case NOT_CONNECTED: {
                this.log.warn("Connection to node {} could not be established. Broker may not be available.", (Object)nodeId);
                break;
            }
        }
        for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
            this.log.trace("Cancelled request {} {} with correlation id {} due to node {} being disconnected", new Object[]{request.header.apiKey(), request.request, request.header.correlationId(), nodeId});
            if (!request.isInternalRequest) {
                responses.add(request.disconnected(now, disconnectState.exception()));
                continue;
            }
            if (request.header.apiKey() != ApiKeys.METADATA) continue;
            this.metadataUpdater.handleDisconnection(request.destination);
        }
    }

    private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
        List<String> nodeIds = this.inFlightRequests.nodesWithTimedOutRequests(now);
        for (String nodeId : nodeIds) {
            this.selector.close(nodeId);
            this.log.debug("Disconnecting from node {} due to request timeout.", (Object)nodeId);
            this.processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
        }
        if (!nodeIds.isEmpty()) {
            this.metadataUpdater.requestUpdate();
        }
    }

    private void handleAbortedSends(List<ClientResponse> responses) {
        responses.addAll(this.abortedSends);
        this.abortedSends.clear();
    }

    private void handleCompletedSends(List<ClientResponse> responses, long now) {
        for (Send send : this.selector.completedSends()) {
            InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
            if (request.expectResponse) continue;
            this.inFlightRequests.completeLastSent(send.destination());
            responses.add(request.completed(null, now));
        }
    }

    private void maybeThrottle(AbstractResponse response, short apiVersion, String nodeId, long now) {
        int throttleTimeMs = response.throttleTimeMs();
        if (throttleTimeMs > 0 && response.shouldClientThrottle(apiVersion)) {
            this.connectionStates.throttle(nodeId, now + (long)throttleTimeMs);
            this.log.trace("Connection to node {} is throttled for {} ms until timestamp {}", new Object[]{nodeId, throttleTimeMs, now + (long)throttleTimeMs});
        }
    }

    private void handleCompletedReceives(List<ClientResponse> responses, long now) {
        for (NetworkReceive receive : this.selector.completedReceives()) {
            String source = receive.source();
            InFlightRequest req = this.inFlightRequests.completeNext(source);
            Struct responseStruct = NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, this.throttleTimeSensor, now);
            if (this.log.isTraceEnabled()) {
                this.log.trace("Completed receive from node {} for {} with correlation id {}, received {}", new Object[]{req.destination, req.header.apiKey(), req.header.correlationId(), responseStruct});
            }
            AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct);
            this.maybeThrottle(body, req.header.apiVersion(), req.destination, now);
            if (req.isInternalRequest && body instanceof MetadataResponse) {
                this.metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse)body);
                continue;
            }
            if (req.isInternalRequest && body instanceof ApiVersionsResponse) {
                this.handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse)body);
                continue;
            }
            responses.add(req.completed(body, now));
        }
    }

    private void handleApiVersionsResponse(List<ClientResponse> responses, InFlightRequest req, long now, ApiVersionsResponse apiVersionsResponse) {
        String node = req.destination;
        if (apiVersionsResponse.error() != Errors.NONE) {
            if (req.request.version() == 0 || apiVersionsResponse.error() != Errors.UNSUPPORTED_VERSION) {
                this.log.warn("Received error {} from node {} when making an ApiVersionsRequest with correlation id {}. Disconnecting.", new Object[]{apiVersionsResponse.error(), node, req.header.correlationId()});
                this.selector.close(node);
                this.processDisconnection(responses, node, now, ChannelState.LOCAL_CLOSE);
            } else {
                this.nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder(0));
            }
            return;
        }
        NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.apiVersions());
        this.apiVersions.update(node, nodeVersionInfo);
        this.connectionStates.ready(node);
        this.log.debug("Recorded API versions for node {}: {}", (Object)node, (Object)nodeVersionInfo);
    }

    private void handleDisconnections(List<ClientResponse> responses, long now) {
        for (Map.Entry<String, ChannelState> entry : this.selector.disconnected().entrySet()) {
            String node = entry.getKey();
            this.log.debug("Node {} disconnected.", (Object)node);
            this.processDisconnection(responses, node, now, entry.getValue());
        }
        if (this.selector.disconnected().size() > 0) {
            this.metadataUpdater.requestUpdate();
        }
    }

    private void handleConnections() {
        for (String node : this.selector.connected()) {
            if (this.discoverBrokerVersions) {
                this.connectionStates.checkingApiVersions(node);
                this.nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
                this.log.debug("Completed connection to node {}. Fetching API versions.", (Object)node);
                continue;
            }
            this.connectionStates.ready(node);
            this.log.debug("Completed connection to node {}. Ready.", (Object)node);
        }
    }

    private void handleInitiateApiVersionRequests(long now) {
        Iterator<Map.Entry<String, ApiVersionsRequest.Builder>> iter = this.nodesNeedingApiVersionsFetch.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<String, ApiVersionsRequest.Builder> entry = iter.next();
            String node = entry.getKey();
            if (!this.selector.isChannelReady(node) || !this.inFlightRequests.canSendMore(node)) continue;
            this.log.debug("Initiating API versions fetch from node {}.", (Object)node);
            ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue();
            ClientRequest clientRequest = this.newClientRequest(node, apiVersionRequestBuilder, now, true);
            this.doSend(clientRequest, true, now);
            iter.remove();
        }
    }

    private static void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
        if (requestHeader.correlationId() != responseHeader.correlationId()) {
            throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId() + ") does not match request (" + requestHeader.correlationId() + "), request header: " + requestHeader);
        }
    }

    private void initiateConnect(Node node, long now) {
        String nodeConnectionId = node.idString();
        try {
            this.log.debug("Initiating connection to node {}", (Object)node);
            this.connectionStates.connecting(nodeConnectionId, now);
            this.selector.connect(nodeConnectionId, new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
        }
        catch (IOException e) {
            this.connectionStates.disconnected(nodeConnectionId, now);
            this.metadataUpdater.requestUpdate();
            this.log.warn("Error connecting to node {}", (Object)node, (Object)e);
        }
    }

    @Override
    public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs, boolean expectResponse) {
        return this.newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, this.defaultRequestTimeoutMs, null);
    }

    @Override
    public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs, boolean expectResponse, int requestTimeoutMs, RequestCompletionHandler callback) {
        return new ClientRequest(nodeId, requestBuilder, this.correlation++, this.clientId, createdTimeMs, expectResponse, requestTimeoutMs, callback);
    }

    public boolean discoverBrokerVersions() {
        return this.discoverBrokerVersions;
    }

    static class InFlightRequest {
        final RequestHeader header;
        final String destination;
        final RequestCompletionHandler callback;
        final boolean expectResponse;
        final AbstractRequest request;
        final boolean isInternalRequest;
        final Send send;
        final long sendTimeMs;
        final long createdTimeMs;
        final long requestTimeoutMs;

        public InFlightRequest(ClientRequest clientRequest, RequestHeader header, boolean isInternalRequest, AbstractRequest request, Send send, long sendTimeMs) {
            this(header, clientRequest.requestTimeoutMs(), clientRequest.createdTimeMs(), clientRequest.destination(), clientRequest.callback(), clientRequest.expectResponse(), isInternalRequest, request, send, sendTimeMs);
        }

        public InFlightRequest(RequestHeader header, int requestTimeoutMs, long createdTimeMs, String destination, RequestCompletionHandler callback, boolean expectResponse, boolean isInternalRequest, AbstractRequest request, Send send, long sendTimeMs) {
            this.header = header;
            this.requestTimeoutMs = requestTimeoutMs;
            this.createdTimeMs = createdTimeMs;
            this.destination = destination;
            this.callback = callback;
            this.expectResponse = expectResponse;
            this.isInternalRequest = isInternalRequest;
            this.request = request;
            this.send = send;
            this.sendTimeMs = sendTimeMs;
        }

        public ClientResponse completed(AbstractResponse response, long timeMs) {
            return new ClientResponse(this.header, this.callback, this.destination, this.createdTimeMs, timeMs, false, null, null, response);
        }

        public ClientResponse disconnected(long timeMs, AuthenticationException authenticationException) {
            return new ClientResponse(this.header, this.callback, this.destination, this.createdTimeMs, timeMs, true, null, authenticationException, null);
        }

        public String toString() {
            return "InFlightRequest(header=" + this.header + ", destination=" + this.destination + ", expectResponse=" + this.expectResponse + ", createdTimeMs=" + this.createdTimeMs + ", sendTimeMs=" + this.sendTimeMs + ", isInternalRequest=" + this.isInternalRequest + ", request=" + this.request + ", callback=" + this.callback + ", send=" + this.send + ")";
        }
    }

    class DefaultMetadataUpdater
    implements MetadataUpdater {
        private final Metadata metadata;
        private boolean metadataFetchInProgress;

        DefaultMetadataUpdater(Metadata metadata) {
            this.metadata = metadata;
            this.metadataFetchInProgress = false;
        }

        @Override
        public List<Node> fetchNodes() {
            return this.metadata.fetch().nodes();
        }

        @Override
        public boolean isUpdateDue(long now) {
            return !this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0L;
        }

        @Override
        public long maybeUpdate(long now) {
            long waitForMetadataFetch;
            long timeToNextMetadataUpdate = this.metadata.timeToNextUpdate(now);
            long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch = this.metadataFetchInProgress ? (long)NetworkClient.this.defaultRequestTimeoutMs : 0L);
            if (metadataTimeout > 0L) {
                return metadataTimeout;
            }
            Node node = NetworkClient.this.leastLoadedNode(now);
            if (node == null) {
                NetworkClient.this.log.debug("Give up sending metadata request since no node is available");
                return NetworkClient.this.reconnectBackoffMs;
            }
            return this.maybeUpdate(now, node);
        }

        @Override
        public void handleDisconnection(String destination) {
            int nodeId;
            Node node;
            Cluster cluster = this.metadata.fetch();
            if (cluster.isBootstrapConfigured() && (node = cluster.nodeById(nodeId = Integer.parseInt(destination))) != null) {
                NetworkClient.this.log.warn("Bootstrap broker {} disconnected", (Object)node);
            }
            this.metadataFetchInProgress = false;
        }

        @Override
        public void handleAuthenticationFailure(AuthenticationException exception) {
            this.metadataFetchInProgress = false;
            if (this.metadata.updateRequested()) {
                this.metadata.failedUpdate(NetworkClient.this.time.milliseconds(), exception);
            }
        }

        @Override
        public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
            Map<String, Errors> errors;
            this.metadataFetchInProgress = false;
            Cluster cluster = response.cluster();
            List missingListenerPartitions = response.topicMetadata().stream().flatMap(topicMetadata -> topicMetadata.partitionMetadata().stream().filter(partitionMetadata -> partitionMetadata.error() == Errors.LISTENER_NOT_FOUND).map(partitionMetadata -> new TopicPartition(topicMetadata.topic(), partitionMetadata.partition()))).collect(Collectors.toList());
            if (!missingListenerPartitions.isEmpty()) {
                int count = missingListenerPartitions.size();
                NetworkClient.this.log.warn("{} partitions have leader brokers without a matching listener, including {}", (Object)count, missingListenerPartitions.subList(0, Math.min(10, count)));
            }
            if (!(errors = response.errors()).isEmpty()) {
                NetworkClient.this.log.warn("Error while fetching metadata with correlation id {} : {}", (Object)requestHeader.correlationId(), errors);
            }
            if (cluster.nodes().size() > 0) {
                this.metadata.update(cluster, response.unavailableTopics(), now);
            } else {
                NetworkClient.this.log.trace("Ignoring empty metadata response with correlation id {}.", (Object)requestHeader.correlationId());
                this.metadata.failedUpdate(now, null);
            }
        }

        @Override
        public void requestUpdate() {
            this.metadata.requestUpdate();
        }

        @Override
        public void close() {
            this.metadata.close();
        }

        private boolean isAnyNodeConnecting() {
            for (Node node : this.fetchNodes()) {
                if (!NetworkClient.this.connectionStates.isConnecting(node.idString())) continue;
                return true;
            }
            return false;
        }

        private long maybeUpdate(long now, Node node) {
            String nodeConnectionId = node.idString();
            if (NetworkClient.this.canSendRequest(nodeConnectionId, now)) {
                this.metadataFetchInProgress = true;
                MetadataRequest.Builder metadataRequest = this.metadata.needMetadataForAllTopics() ? MetadataRequest.Builder.allTopics() : new MetadataRequest.Builder(new ArrayList<String>(this.metadata.topics()), this.metadata.allowAutoTopicCreation());
                NetworkClient.this.log.debug("Sending metadata request {} to node {}", (Object)metadataRequest, (Object)node);
                NetworkClient.this.sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
                return NetworkClient.this.defaultRequestTimeoutMs;
            }
            if (this.isAnyNodeConnecting()) {
                return NetworkClient.this.reconnectBackoffMs;
            }
            if (NetworkClient.this.connectionStates.canConnect(nodeConnectionId, now)) {
                NetworkClient.this.log.debug("Initialize connection to node {} for sending metadata request", (Object)node);
                NetworkClient.this.initiateConnect(node, now);
                return NetworkClient.this.reconnectBackoffMs;
            }
            return Long.MAX_VALUE;
        }
    }
}

