package org.apache.kafka.clients;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kafka-clients-0.8.2.2.jar:org/apache/kafka/clients/NetworkClient.class */
public class NetworkClient implements KafkaClient {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) NetworkClient.class);
    private final Selectable selector;
    private final Metadata metadata;
    private final ClusterConnectionStates connectionStates;
    private final InFlightRequests inFlightRequests;
    private final int socketSendBuffer;
    private final int socketReceiveBuffer;
    private final String clientId;
    private int correlation = 0;
    private final int nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE);
    private boolean metadataFetchInProgress = false;
    private long lastNoNodeAvailableMs = 0;

    public NetworkClient(Selectable selectable, Metadata metadata, String str, int i, long j, int i2, int i3) {
        this.selector = selectable;
        this.metadata = metadata;
        this.clientId = str;
        this.inFlightRequests = new InFlightRequests(i);
        this.connectionStates = new ClusterConnectionStates(j);
        this.socketSendBuffer = i2;
        this.socketReceiveBuffer = i3;
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public boolean ready(Node node, long j) {
        if (isReady(node, j)) {
            return true;
        }
        if (!this.connectionStates.canConnect(node.id(), j)) {
            return false;
        }
        initiateConnect(node, j);
        return false;
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public long connectionDelay(Node node, long j) {
        return this.connectionStates.connectionDelay(node.id(), j);
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public boolean isReady(Node node, long j) {
        int id = node.id();
        if (this.metadataFetchInProgress || this.metadata.timeToNextUpdate(j) != 0) {
            return isSendable(id);
        }
        return false;
    }

    private boolean isSendable(int i) {
        return this.connectionStates.isConnected(i) && this.inFlightRequests.canSendMore(i);
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public List<ClientResponse> poll(List<ClientRequest> list, long j, long j2) {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < list.size(); i++) {
            ClientRequest clientRequest = list.get(i);
            int destination = clientRequest.request().destination();
            if (!isSendable(destination)) {
                throw new IllegalStateException("Attempt to send a request to node " + destination + " which is not ready.");
            }
            this.inFlightRequests.add(clientRequest);
            arrayList.add(clientRequest.request());
        }
        long max = Math.max(Math.max(this.metadata.timeToNextUpdate(j2), Math.max((this.lastNoNodeAvailableMs + this.metadata.refreshBackoff()) - j2, 0L)), this.metadataFetchInProgress ? Integer.MAX_VALUE : 0);
        if (!this.metadataFetchInProgress && max == 0) {
            maybeUpdateMetadata(arrayList, j2);
        }
        try {
            this.selector.poll(Math.min(j, max), arrayList);
        } catch (IOException e) {
            log.error("Unexpected error during I/O in producer network thread", (Throwable) e);
        }
        ArrayList arrayList2 = new ArrayList();
        handleCompletedSends(arrayList2, j2);
        handleCompletedReceives(arrayList2, j2);
        handleDisconnections(arrayList2, j2);
        handleConnections();
        return arrayList2;
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public int inFlightRequestCount() {
        return this.inFlightRequests.inFlightRequestCount();
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public RequestHeader nextRequestHeader(ApiKeys apiKeys) {
        short s = apiKeys.id;
        String str = this.clientId;
        int i = this.correlation;
        this.correlation = i + 1;
        return new RequestHeader(s, str, i);
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public void wakeup() {
        this.selector.wakeup();
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public void close() {
        this.selector.close();
    }

    @Override // org.apache.kafka.clients.KafkaClient
    public Node leastLoadedNode(long j) {
        List<Node> nodes = this.metadata.fetch().nodes();
        int i = Integer.MAX_VALUE;
        Node node = null;
        for (int i2 = 0; i2 < nodes.size(); i2++) {
            Node node2 = nodes.get(Utils.abs((this.nodeIndexOffset + i2) % nodes.size()));
            int inFlightRequestCount = this.inFlightRequests.inFlightRequestCount(node2.id());
            if (inFlightRequestCount == 0 && this.connectionStates.isConnected(node2.id())) {
                return node2;
            }
            if (!this.connectionStates.isBlackedOut(node2.id(), j) && inFlightRequestCount < i) {
                i = inFlightRequestCount;
                node = node2;
            }
        }
        return node;
    }

    private void handleCompletedSends(List<ClientResponse> list, long j) {
        for (NetworkSend networkSend : this.selector.completedSends()) {
            ClientRequest lastSent = this.inFlightRequests.lastSent(networkSend.destination());
            if (!lastSent.expectResponse()) {
                this.inFlightRequests.completeLastSent(networkSend.destination());
                list.add(new ClientResponse(lastSent, j, false, null));
            }
        }
    }

    private void handleCompletedReceives(List<ClientResponse> list, long j) {
        for (NetworkReceive networkReceive : this.selector.completedReceives()) {
            ClientRequest completeNext = this.inFlightRequests.completeNext(networkReceive.source());
            ResponseHeader parse = ResponseHeader.parse(networkReceive.payload());
            short apiKey = completeNext.request().header().apiKey();
            Struct struct = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(networkReceive.payload());
            correlate(completeNext.request().header(), parse);
            if (apiKey == ApiKeys.METADATA.id) {
                handleMetadataResponse(completeNext.request().header(), struct, j);
            } else {
                list.add(new ClientResponse(completeNext, j, false, struct));
            }
        }
    }

    private void handleMetadataResponse(RequestHeader requestHeader, Struct struct, long j) {
        this.metadataFetchInProgress = false;
        Cluster cluster = new MetadataResponse(struct).cluster();
        if (cluster.nodes().size() > 0) {
            this.metadata.update(cluster, j);
        } else {
            log.trace("Ignoring empty metadata response with correlation id {}.", Integer.valueOf(requestHeader.correlationId()));
            this.metadata.failedUpdate(j);
        }
    }

    private void handleDisconnections(List<ClientResponse> list, long j) {
        Iterator<Integer> it = this.selector.disconnected().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            this.connectionStates.disconnected(intValue);
            log.debug("Node {} disconnected.", Integer.valueOf(intValue));
            for (ClientRequest clientRequest : this.inFlightRequests.clearAll(intValue)) {
                log.trace("Cancelled request {} due to node {} being disconnected", clientRequest, Integer.valueOf(intValue));
                if (ApiKeys.forId(clientRequest.request().header().apiKey()) == ApiKeys.METADATA) {
                    this.metadataFetchInProgress = false;
                } else {
                    list.add(new ClientResponse(clientRequest, j, true, null));
                }
            }
        }
        if (this.selector.disconnected().size() > 0) {
            this.metadata.requestUpdate();
        }
    }

    private void handleConnections() {
        for (Integer num : this.selector.connected()) {
            log.debug("Completed connection to node {}", num);
            this.connectionStates.connected(num.intValue());
        }
    }

    private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
        if (requestHeader.correlationId() != responseHeader.correlationId()) {
            throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId() + ") does not match request (" + requestHeader.correlationId() + ")");
        }
    }

    private ClientRequest metadataRequest(long j, int i, Set<String> set) {
        return new ClientRequest(j, true, new RequestSend(i, nextRequestHeader(ApiKeys.METADATA), new MetadataRequest(new ArrayList(set)).toStruct()), null);
    }

    private void maybeUpdateMetadata(List<NetworkSend> list, long j) {
        Node leastLoadedNode = leastLoadedNode(j);
        if (leastLoadedNode == null) {
            log.debug("Give up sending metadata request since no node is available");
            this.lastNoNodeAvailableMs = j;
            return;
        }
        log.debug("Trying to send metadata request to node {}", Integer.valueOf(leastLoadedNode.id()));
        if (!this.connectionStates.isConnected(leastLoadedNode.id()) || !this.inFlightRequests.canSendMore(leastLoadedNode.id())) {
            if (!this.connectionStates.canConnect(leastLoadedNode.id(), j)) {
                this.lastNoNodeAvailableMs = j;
                return;
            } else {
                log.debug("Init connection to node {} for sending metadata request in the next iteration", Integer.valueOf(leastLoadedNode.id()));
                initiateConnect(leastLoadedNode, j);
                return;
            }
        }
        Set<String> set = this.metadata.topics();
        this.metadataFetchInProgress = true;
        ClientRequest metadataRequest = metadataRequest(j, leastLoadedNode.id(), set);
        log.debug("Sending metadata request {} to node {}", metadataRequest, Integer.valueOf(leastLoadedNode.id()));
        list.add(metadataRequest.request());
        this.inFlightRequests.add(metadataRequest);
    }

    private void initiateConnect(Node node, long j) {
        try {
            log.debug("Initiating connection to node {} at {}:{}.", Integer.valueOf(node.id()), node.host(), Integer.valueOf(node.port()));
            this.connectionStates.connecting(node.id(), j);
            this.selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
        } catch (IOException e) {
            this.connectionStates.disconnected(node.id());
            this.metadata.requestUpdate();
            log.debug("Error connecting to node {} at {}:{}:", Integer.valueOf(node.id()), node.host(), Integer.valueOf(node.port()), e);
        }
    }
}
