package org.apache.kafka.clients;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/clients/NetworkClientTest.class */
public class NetworkClientTest {
    protected final int defaultRequestTimeoutMs = 1000;
    protected final MockTime time = new MockTime();
    protected final MockSelector selector = new MockSelector(this.time);
    protected final Node node = (Node) TestUtils.singletonCluster().nodes().iterator().next();
    protected final long reconnectBackoffMsTest = 10000;
    protected final long reconnectBackoffMaxMsTest = 100000;
    protected final long connectionSetupTimeoutMsTest = 5000;
    protected final long connectionSetupTimeoutMaxMsTest = 127000;
    private final int reconnectBackoffExpBase = 2;
    private final double reconnectBackoffJitter = 0.2d;
    private final TestMetadataUpdater metadataUpdater = new TestMetadataUpdater(Collections.singletonList(this.node));
    private final NetworkClient client = createNetworkClient(100000);
    private final NetworkClient clientWithNoExponentialBackoff = createNetworkClient(10000);
    private final NetworkClient clientWithStaticNodes = createNetworkClientWithStaticNodes();
    private final NetworkClient clientWithNoVersionDiscovery = createNetworkClientWithNoVersionDiscovery();
    private static ArrayList<InetAddress> initialAddresses;
    private static ArrayList<InetAddress> newAddresses;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/NetworkClientTest$TestCallbackHandler.class */
    public static class TestCallbackHandler implements RequestCompletionHandler {
        public boolean executed;
        public ClientResponse response;

        private TestCallbackHandler() {
            this.executed = false;
        }

        public void onComplete(ClientResponse clientResponse) {
            this.executed = true;
            this.response = clientResponse;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/NetworkClientTest$TestMetadataUpdater.class */
    public static class TestMetadataUpdater extends ManualMetadataUpdater {
        KafkaException failure;

        public TestMetadataUpdater(List<Node> list) {
            super(list);
        }

        public void handleServerDisconnect(long j, String str, Optional<AuthenticationException> optional) {
            optional.ifPresent(authenticationException -> {
                this.failure = authenticationException;
            });
            super.handleServerDisconnect(j, str, optional);
        }

        public void handleFailedRequest(long j, Optional<KafkaException> optional) {
            optional.ifPresent(kafkaException -> {
                this.failure = kafkaException;
            });
        }

        public KafkaException getAndClearFailure() {
            KafkaException kafkaException = this.failure;
            this.failure = null;
            return kafkaException;
        }
    }

    private NetworkClient createNetworkClient(long j) {
        return new NetworkClient(this.selector, this.metadataUpdater, "mock", Integer.MAX_VALUE, 10000L, j, 65536, 65536, 1000, 5000L, 127000L, this.time, true, new ApiVersions(), new LogContext());
    }

    private NetworkClient createNetworkClientWithMultipleNodes(long j, long j2, int i) {
        return new NetworkClient(this.selector, new TestMetadataUpdater(TestUtils.clusterWith(i).nodes()), "mock", Integer.MAX_VALUE, 10000L, j, 65536, 65536, 1000, j2, 127000L, this.time, true, new ApiVersions(), new LogContext());
    }

    private NetworkClient createNetworkClientWithStaticNodes() {
        return new NetworkClient(this.selector, this.metadataUpdater, "mock-static", Integer.MAX_VALUE, 0L, 0L, 65536, 65536, 1000, 5000L, 127000L, this.time, true, new ApiVersions(), new LogContext());
    }

    private NetworkClient createNetworkClientWithNoVersionDiscovery(Metadata metadata) {
        return new NetworkClient(this.selector, metadata, "mock", Integer.MAX_VALUE, 10000L, 0L, 65536, 65536, 1000, 5000L, 127000L, this.time, false, new ApiVersions(), new LogContext());
    }

    private NetworkClient createNetworkClientWithNoVersionDiscovery() {
        return new NetworkClient(this.selector, this.metadataUpdater, "mock", Integer.MAX_VALUE, 10000L, 100000L, 65536, 65536, 1000, 5000L, 127000L, this.time, false, new ApiVersions(), new LogContext());
    }

    @BeforeEach
    public void setup() {
        this.selector.reset();
    }

    @Test
    public void testSendToUnreadyNode() {
        MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.singletonList("test"), true);
        long milliseconds = this.time.milliseconds();
        ClientRequest newClientRequest = this.client.newClientRequest("5", builder, milliseconds, false);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.client.send(newClientRequest, milliseconds);
        });
    }

    @Test
    public void testSimpleRequestResponse() {
        checkSimpleRequestResponse(this.client);
    }

    @Test
    public void testSimpleRequestResponseWithStaticNodes() {
        checkSimpleRequestResponse(this.clientWithStaticNodes);
    }

    @Test
    public void testSimpleRequestResponseWithNoBrokerDiscovery() {
        checkSimpleRequestResponse(this.clientWithNoVersionDiscovery);
    }

    @Test
    public void testDnsLookupFailure() {
        Assertions.assertFalse(this.client.ready(new Node(1234, "badhost", 1234), this.time.milliseconds()));
    }

    @Test
    public void testClose() {
        this.client.ready(this.node, this.time.milliseconds());
        awaitReady(this.client, this.node);
        this.client.poll(1L, this.time.milliseconds());
        Assertions.assertTrue(this.client.isReady(this.node, this.time.milliseconds()), "The client should be ready");
        this.client.send(this.client.newClientRequest(this.node.idString(), ProduceRequest.forCurrentMagic(new ProduceRequestData().setTopicData(new ProduceRequestData.TopicProduceDataCollection()).setAcks((short) 1).setTimeoutMs(1000)), this.time.milliseconds(), true), this.time.milliseconds());
        Assertions.assertEquals(1, this.client.inFlightRequestCount(this.node.idString()), "There should be 1 in-flight request after send");
        Assertions.assertTrue(this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertTrue(this.client.hasInFlightRequests());
        this.client.close(this.node.idString());
        Assertions.assertEquals(0, this.client.inFlightRequestCount(this.node.idString()), "There should be no in-flight request after close");
        Assertions.assertFalse(this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertFalse(this.client.isReady(this.node, 0L), "Connection should not be ready after close");
    }

    @Test
    public void testUnsupportedVersionDuringInternalMetadataRequest() {
        this.client.sendInternalMetadataRequest(new MetadataRequest.Builder(Collections.singletonList("topic_1"), false, (short) 3), this.node.idString(), this.time.milliseconds());
        Assertions.assertEquals(UnsupportedVersionException.class, this.metadataUpdater.getAndClearFailure().getClass());
    }

    private void checkSimpleRequestResponse(NetworkClient networkClient) {
        awaitReady(networkClient, this.node);
        short latestVersion = ApiKeys.PRODUCE.latestVersion();
        ProduceRequest.Builder builder = new ProduceRequest.Builder(latestVersion, latestVersion, new ProduceRequestData().setAcks((short) 1).setTimeoutMs(1000));
        TestCallbackHandler testCallbackHandler = new TestCallbackHandler();
        ClientRequest newClientRequest = networkClient.newClientRequest(this.node.idString(), builder, this.time.milliseconds(), true, 1000, testCallbackHandler);
        networkClient.send(newClientRequest, this.time.milliseconds());
        networkClient.poll(1L, this.time.milliseconds());
        Assertions.assertEquals(1, networkClient.inFlightRequestCount());
        this.selector.completeReceive(new NetworkReceive(this.node.idString(), RequestTestUtils.serializeResponseWithHeader(new ProduceResponse(new ProduceResponseData()), latestVersion, newClientRequest.correlationId())));
        Assertions.assertEquals(1, networkClient.poll(1L, this.time.milliseconds()).size());
        Assertions.assertTrue(testCallbackHandler.executed, "The handler should have executed.");
        Assertions.assertTrue(testCallbackHandler.response.hasResponse(), "Should have a response body.");
        Assertions.assertEquals(newClientRequest.correlationId(), testCallbackHandler.response.requestHeader().correlationId(), "Should be correlated to the original request");
    }

    private void delayedApiVersionsResponse(int i, short s, ApiVersionsResponse apiVersionsResponse) {
        this.selector.delayedReceive(new DelayedReceive(this.node.idString(), new NetworkReceive(this.node.idString(), RequestTestUtils.serializeResponseWithHeader(apiVersionsResponse, s, i))));
    }

    private void setExpectedApiVersionsResponse(ApiVersionsResponse apiVersionsResponse) {
        delayedApiVersionsResponse(0, apiVersionsResponse.apiVersion(ApiKeys.API_VERSIONS.id).maxVersion(), apiVersionsResponse);
    }

    private void awaitReady(NetworkClient networkClient, Node node) {
        if (networkClient.discoverBrokerVersions()) {
            setExpectedApiVersionsResponse(ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER));
        }
        while (!networkClient.ready(node, this.time.milliseconds())) {
            networkClient.poll(1L, this.time.milliseconds());
        }
        this.selector.clear();
    }

    @Test
    public void testInvalidApiVersionsRequest() {
        this.client.ready(this.node, this.time.milliseconds());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue(this.client.hasInFlightRequests(this.node.idString()));
        delayedApiVersionsResponse(0, ApiKeys.API_VERSIONS.latestVersion(), new ApiVersionsResponse(new ApiVersionsResponseData().setErrorCode(Errors.INVALID_REQUEST.code()).setThrottleTimeMs(0)));
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse(this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertFalse(this.client.isReady(this.node, this.time.milliseconds()));
    }

    @Test
    public void testApiVersionsRequest() {
        this.client.ready(this.node, this.time.milliseconds());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue(this.client.hasInFlightRequests(this.node.idString()));
        delayedApiVersionsResponse(0, ApiKeys.API_VERSIONS.latestVersion(), defaultApiVersionsResponse());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse(this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertTrue(this.client.isReady(this.node, this.time.milliseconds()));
    }

    @Test
    public void testUnsupportedApiVersionsRequestWithVersionProvidedByTheBroker() {
        this.client.ready(this.node, this.time.milliseconds());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue(this.client.hasInFlightRequests(this.node.idString()));
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertEquals(1, this.selector.completedSends().size());
        RequestHeader parseHeader = parseHeader(this.selector.completedSendBuffers().get(0).buffer());
        Assertions.assertEquals(ApiKeys.API_VERSIONS, parseHeader.apiKey());
        Assertions.assertEquals(3, parseHeader.apiVersion());
        ApiVersionsResponseData.ApiVersionCollection apiVersionCollection = new ApiVersionsResponseData.ApiVersionCollection();
        apiVersionCollection.add(new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.API_VERSIONS.id).setMinVersion((short) 0).setMaxVersion((short) 2));
        delayedApiVersionsResponse(0, (short) 0, new ApiVersionsResponse(new ApiVersionsResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code()).setApiKeys(apiVersionCollection)));
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue(this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertEquals(1, this.selector.m789completedReceives().size());
        this.selector.completedSends().clear();
        this.selector.completedSendBuffers().clear();
        this.selector.m789completedReceives().clear();
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertEquals(1, this.selector.completedSends().size());
        RequestHeader parseHeader2 = parseHeader(this.selector.completedSendBuffers().get(0).buffer());
        Assertions.assertEquals(ApiKeys.API_VERSIONS, parseHeader2.apiKey());
        Assertions.assertEquals(2, parseHeader2.apiVersion());
        delayedApiVersionsResponse(1, (short) 0, defaultApiVersionsResponse());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse(this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertEquals(1, this.selector.m789completedReceives().size());
        Assertions.assertTrue(this.client.isReady(this.node, this.time.milliseconds()));
    }

    @Test
    public void testUnsupportedApiVersionsRequestWithoutVersionProvidedByTheBroker() {
        this.client.ready(this.node, this.time.milliseconds());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue(this.client.hasInFlightRequests(this.node.idString()));
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertEquals(1, this.selector.completedSends().size());
        RequestHeader parseHeader = parseHeader(this.selector.completedSendBuffers().get(0).buffer());
        Assertions.assertEquals(ApiKeys.API_VERSIONS, parseHeader.apiKey());
        Assertions.assertEquals(3, parseHeader.apiVersion());
        delayedApiVersionsResponse(0, (short) 0, new ApiVersionsResponse(new ApiVersionsResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code())));
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue(this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertEquals(1, this.selector.m789completedReceives().size());
        this.selector.completedSends().clear();
        this.selector.completedSendBuffers().clear();
        this.selector.m789completedReceives().clear();
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertEquals(1, this.selector.completedSends().size());
        RequestHeader parseHeader2 = parseHeader(this.selector.completedSendBuffers().get(0).buffer());
        Assertions.assertEquals(ApiKeys.API_VERSIONS, parseHeader2.apiKey());
        Assertions.assertEquals(0, parseHeader2.apiVersion());
        delayedApiVersionsResponse(1, (short) 0, defaultApiVersionsResponse());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse(this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertEquals(1, this.selector.m789completedReceives().size());
        Assertions.assertTrue(this.client.isReady(this.node, this.time.milliseconds()));
    }

    @Test
    public void testRequestTimeout() {
        awaitReady(this.client, this.node);
        ClientRequest newClientRequest = this.client.newClientRequest(this.node.idString(), ProduceRequest.forCurrentMagic(new ProduceRequestData().setTopicData(new ProduceRequestData.TopicProduceDataCollection()).setAcks((short) 1).setTimeoutMs(1000)), this.time.milliseconds(), true, 6000, new TestCallbackHandler());
        Assertions.assertEquals(6000, newClientRequest.requestTimeoutMs());
        testRequestTimeout(newClientRequest);
    }

    @Test
    public void testDefaultRequestTimeout() {
        awaitReady(this.client, this.node);
        ClientRequest newClientRequest = this.client.newClientRequest(this.node.idString(), ProduceRequest.forCurrentMagic(new ProduceRequestData().setTopicData(new ProduceRequestData.TopicProduceDataCollection()).setAcks((short) 1).setTimeoutMs(1000)), this.time.milliseconds(), true);
        Assertions.assertEquals(1000, newClientRequest.requestTimeoutMs());
        testRequestTimeout(newClientRequest);
    }

    private void testRequestTimeout(ClientRequest clientRequest) {
        this.client.send(clientRequest, this.time.milliseconds());
        this.time.sleep(clientRequest.requestTimeoutMs() + 1);
        List poll = this.client.poll(0L, this.time.milliseconds());
        Assertions.assertEquals(1, poll.size());
        ClientResponse clientResponse = (ClientResponse) poll.get(0);
        Assertions.assertEquals(this.node.idString(), clientResponse.destination());
        Assertions.assertTrue(clientResponse.wasDisconnected(), "Expected response to fail due to disconnection");
    }

    @Test
    public void testConnectionSetupTimeout() {
        Cluster clusterWith = TestUtils.clusterWith(2);
        Node nodeById = clusterWith.nodeById(0);
        Node nodeById2 = clusterWith.nodeById(1);
        this.client.ready(nodeById, this.time.milliseconds());
        this.selector.serverConnectionBlocked(nodeById.idString());
        this.client.ready(nodeById2, this.time.milliseconds());
        this.selector.serverConnectionBlocked(nodeById2.idString());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse(this.client.connectionFailed(this.node), "The connections should not fail before the socket connection setup timeout elapsed");
        this.time.sleep(6001L);
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue(this.client.connectionFailed(this.node), "Expected the connections to fail due to the socket connection setup timeout");
    }

    @Test
    public void testConnectionThrottling() {
        awaitReady(this.client, this.node);
        short latestVersion = ApiKeys.PRODUCE.latestVersion();
        ClientRequest newClientRequest = this.client.newClientRequest(this.node.idString(), new ProduceRequest.Builder(latestVersion, latestVersion, new ProduceRequestData().setAcks((short) 1).setTimeoutMs(1000)), this.time.milliseconds(), true, 1000, new TestCallbackHandler());
        this.client.send(newClientRequest, this.time.milliseconds());
        this.client.poll(1L, this.time.milliseconds());
        this.selector.completeReceive(new NetworkReceive(this.node.idString(), RequestTestUtils.serializeResponseWithHeader(new ProduceResponse(new ProduceResponseData().setThrottleTimeMs(100)), latestVersion, newClientRequest.correlationId())));
        this.client.poll(1L, this.time.milliseconds());
        Assertions.assertFalse(this.client.ready(this.node, this.time.milliseconds()));
        Assertions.assertEquals(100L, this.client.throttleDelayMs(this.node, this.time.milliseconds()));
        this.time.sleep(50L);
        Assertions.assertFalse(this.client.ready(this.node, this.time.milliseconds()));
        Assertions.assertEquals(50L, this.client.throttleDelayMs(this.node, this.time.milliseconds()));
        this.time.sleep(50L);
        Assertions.assertTrue(this.client.ready(this.node, this.time.milliseconds()));
        Assertions.assertEquals(0L, this.client.throttleDelayMs(this.node, this.time.milliseconds()));
    }

    private ApiVersionsResponse createExpectedApiVersionsResponse(ApiKeys apiKeys, short s) {
        ApiVersionsResponseData.ApiVersionCollection apiVersionCollection = new ApiVersionsResponseData.ApiVersionCollection();
        for (ApiKeys apiKeys2 : ApiKeys.values()) {
            if (apiKeys2 == apiKeys) {
                apiVersionCollection.add(new ApiVersionsResponseData.ApiVersion().setApiKey(apiKeys2.id).setMinVersion((short) 0).setMaxVersion(s));
            } else {
                apiVersionCollection.add(ApiVersionsResponse.toApiVersion(apiKeys2));
            }
        }
        return new ApiVersionsResponse(new ApiVersionsResponseData().setErrorCode(Errors.NONE.code()).setThrottleTimeMs(0).setApiKeys(apiVersionCollection));
    }

    @Test
    public void testThrottlingNotEnabledForConnectionToOlderBroker() {
        setExpectedApiVersionsResponse(createExpectedApiVersionsResponse(ApiKeys.PRODUCE, (short) 5));
        while (!this.client.ready(this.node, this.time.milliseconds())) {
            this.client.poll(1L, this.time.milliseconds());
        }
        this.selector.clear();
        int sendEmptyProduceRequest = sendEmptyProduceRequest();
        this.client.poll(1L, this.time.milliseconds());
        sendThrottledProduceResponse(sendEmptyProduceRequest, 100, (short) 5);
        this.client.poll(1L, this.time.milliseconds());
        Assertions.assertTrue(this.client.ready(this.node, this.time.milliseconds()));
        Assertions.assertEquals(0L, this.client.throttleDelayMs(this.node, this.time.milliseconds()));
    }

    private int sendEmptyProduceRequest() {
        return sendEmptyProduceRequest(this.node.idString());
    }

    private int sendEmptyProduceRequest(String str) {
        ClientRequest newClientRequest = this.client.newClientRequest(str, ProduceRequest.forCurrentMagic(new ProduceRequestData().setTopicData(new ProduceRequestData.TopicProduceDataCollection()).setAcks((short) 1).setTimeoutMs(1000)), this.time.milliseconds(), true, 1000, new TestCallbackHandler());
        this.client.send(newClientRequest, this.time.milliseconds());
        return newClientRequest.correlationId();
    }

    private void sendResponse(AbstractResponse abstractResponse, short s, int i) {
        this.selector.completeReceive(new NetworkReceive(this.node.idString(), RequestTestUtils.serializeResponseWithHeader(abstractResponse, s, i)));
    }

    private void sendThrottledProduceResponse(int i, int i2, short s) {
        sendResponse(new ProduceResponse(new ProduceResponseData().setThrottleTimeMs(i2)), s, i);
    }

    @Test
    public void testLeastLoadedNode() {
        this.client.ready(this.node, this.time.milliseconds());
        Assertions.assertFalse(this.client.isReady(this.node, this.time.milliseconds()));
        Assertions.assertEquals(this.node, this.client.leastLoadedNode(this.time.milliseconds()));
        awaitReady(this.client, this.node);
        this.client.poll(1L, this.time.milliseconds());
        Assertions.assertTrue(this.client.isReady(this.node, this.time.milliseconds()), "The client should be ready");
        Assertions.assertEquals(this.client.leastLoadedNode(this.time.milliseconds()).id(), this.node.id(), "There should be one leastloadednode");
        this.time.sleep(10000L);
        this.selector.serverDisconnect(this.node.idString());
        this.client.poll(1L, this.time.milliseconds());
        Assertions.assertFalse(this.client.ready(this.node, this.time.milliseconds()), "After we forced the disconnection the client is no longer ready.");
        Assertions.assertNull(this.client.leastLoadedNode(this.time.milliseconds()), "There should be NO leastloadednode");
    }

    @Test
    public void testLeastLoadedNodeProvideDisconnectedNodesPrioritizedByLastConnectionTimestamp() {
        NetworkClient createNetworkClientWithMultipleNodes = createNetworkClientWithMultipleNodes(0L, 5000L, 3);
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 3 * 10; i++) {
            Node leastLoadedNode = createNetworkClientWithMultipleNodes.leastLoadedNode(this.time.milliseconds());
            Assertions.assertNotNull(leastLoadedNode, "Should provide a node");
            hashSet.add(leastLoadedNode);
            createNetworkClientWithMultipleNodes.ready(leastLoadedNode, this.time.milliseconds());
            createNetworkClientWithMultipleNodes.disconnect(leastLoadedNode.idString());
            this.time.sleep(5001L);
            createNetworkClientWithMultipleNodes.poll(0L, this.time.milliseconds());
            if ((i + 1) % 3 == 0) {
                Assertions.assertEquals(3, hashSet.size(), "All the nodes should be provided");
                hashSet.clear();
            }
        }
    }

    @Test
    public void testAuthenticationFailureWithInFlightMetadataRequest() {
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
        Metadata metadata = new Metadata(50, 5000L, new LogContext(), new ClusterResourceListeners());
        metadata.updateWithCurrentRequestVersion(metadataUpdateWith, false, this.time.milliseconds());
        Cluster fetch = metadata.fetch();
        Node node = (Node) fetch.nodes().get(0);
        Node node2 = (Node) fetch.nodes().get(1);
        NetworkClient createNetworkClientWithNoVersionDiscovery = createNetworkClientWithNoVersionDiscovery(metadata);
        awaitReady(createNetworkClientWithNoVersionDiscovery, node);
        metadata.requestUpdate();
        this.time.sleep(50);
        createNetworkClientWithNoVersionDiscovery.poll(0L, this.time.milliseconds());
        Assertions.assertEquals(Optional.of(node), fetch.nodes().stream().filter(node3 -> {
            return createNetworkClientWithNoVersionDiscovery.hasInFlightRequests(node3.idString());
        }).findFirst());
        Assertions.assertFalse(createNetworkClientWithNoVersionDiscovery.ready(node2, this.time.milliseconds()));
        this.selector.serverAuthenticationFailed(node2.idString());
        createNetworkClientWithNoVersionDiscovery.poll(0L, this.time.milliseconds());
        Assertions.assertNotNull(createNetworkClientWithNoVersionDiscovery.authenticationException(node2));
        RequestHeader parseHeader = parseHeader(this.selector.completedSendBuffers().get(0).buffer());
        Assertions.assertEquals(ApiKeys.METADATA, parseHeader.apiKey());
        this.selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), RequestTestUtils.serializeResponseWithHeader(metadataUpdateWith, parseHeader.apiVersion(), parseHeader.correlationId()))));
        int updateVersion = metadata.updateVersion();
        createNetworkClientWithNoVersionDiscovery.poll(0L, this.time.milliseconds());
        Assertions.assertEquals(updateVersion + 1, metadata.updateVersion());
    }

    @Test
    public void testLeastLoadedNodeConsidersThrottledConnections() {
        this.client.ready(this.node, this.time.milliseconds());
        awaitReady(this.client, this.node);
        this.client.poll(1L, this.time.milliseconds());
        Assertions.assertTrue(this.client.isReady(this.node, this.time.milliseconds()), "The client should be ready");
        int sendEmptyProduceRequest = sendEmptyProduceRequest();
        this.client.poll(1L, this.time.milliseconds());
        sendThrottledProduceResponse(sendEmptyProduceRequest, 100, ApiKeys.PRODUCE.latestVersion());
        this.client.poll(1L, this.time.milliseconds());
        Assertions.assertNull(this.client.leastLoadedNode(this.time.milliseconds()));
    }

    @Test
    public void testConnectionDelayWithNoExponentialBackoff() {
        Assertions.assertEquals(0L, this.clientWithNoExponentialBackoff.connectionDelay(this.node, this.time.milliseconds()));
    }

    @Test
    public void testConnectionDelayConnectedWithNoExponentialBackoff() {
        awaitReady(this.clientWithNoExponentialBackoff, this.node);
        Assertions.assertEquals(Long.MAX_VALUE, this.clientWithNoExponentialBackoff.connectionDelay(this.node, this.time.milliseconds()));
    }

    @Test
    public void testConnectionDelayDisconnectedWithNoExponentialBackoff() {
        awaitReady(this.clientWithNoExponentialBackoff, this.node);
        this.selector.serverDisconnect(this.node.idString());
        this.clientWithNoExponentialBackoff.poll(1000L, this.time.milliseconds());
        long connectionDelay = this.clientWithNoExponentialBackoff.connectionDelay(this.node, this.time.milliseconds());
        Assertions.assertEquals(10000L, connectionDelay);
        this.time.sleep(connectionDelay);
        Assertions.assertEquals(0L, this.clientWithNoExponentialBackoff.connectionDelay(this.node, this.time.milliseconds()));
        this.client.ready(this.node, this.time.milliseconds());
        this.selector.serverDisconnect(this.node.idString());
        this.client.poll(1000L, this.time.milliseconds());
        Assertions.assertEquals(10000L, connectionDelay);
    }

    @Test
    public void testConnectionDelay() {
        Assertions.assertEquals(0L, this.client.connectionDelay(this.node, this.time.milliseconds()));
    }

    @Test
    public void testConnectionDelayConnected() {
        awaitReady(this.client, this.node);
        Assertions.assertEquals(Long.MAX_VALUE, this.client.connectionDelay(this.node, this.time.milliseconds()));
    }

    @Test
    public void testConnectionDelayDisconnected() {
        awaitReady(this.client, this.node);
        this.selector.serverDisconnect(this.node.idString());
        this.client.poll(1000L, this.time.milliseconds());
        long connectionDelay = this.client.connectionDelay(this.node, this.time.milliseconds());
        Assertions.assertEquals(10000L, connectionDelay, 10000 * 0.3d);
        this.time.sleep(connectionDelay);
        Assertions.assertEquals(0L, this.client.connectionDelay(this.node, this.time.milliseconds()));
        this.client.ready(this.node, this.time.milliseconds());
        this.selector.serverDisconnect(this.node.idString());
        this.client.poll(1000L, this.time.milliseconds());
        long round = Math.round((float) (connectionDelay * 2));
        Assertions.assertEquals(round, this.client.connectionDelay(this.node, this.time.milliseconds()), round * 0.6d);
    }

    @Test
    public void testDisconnectDuringUserMetadataRequest() {
        awaitReady(this.client, this.node);
        MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.emptyList(), true);
        long milliseconds = this.time.milliseconds();
        this.client.send(this.client.newClientRequest(this.node.idString(), builder, milliseconds, true), milliseconds);
        this.client.poll(1000L, milliseconds);
        Assertions.assertEquals(1, this.client.inFlightRequestCount(this.node.idString()));
        Assertions.assertTrue(this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertTrue(this.client.hasInFlightRequests());
        this.selector.close(this.node.idString());
        List poll = this.client.poll(1000L, this.time.milliseconds());
        Assertions.assertEquals(1, poll.size());
        Assertions.assertTrue(((ClientResponse) poll.iterator().next()).wasDisconnected());
    }

    @Test
    public void testServerDisconnectAfterInternalApiVersionRequest() throws Exception {
        double log = Math.log(100000.0d / Math.max(10000L, 1L)) / Math.log(2.0d);
        for (int i = 0; i < 5; i++) {
            this.selector.clear();
            awaitInFlightApiVersionRequest();
            this.selector.serverDisconnect(this.node.idString());
            List poll = this.client.poll(0L, this.time.milliseconds());
            Assertions.assertFalse(this.client.hasInFlightRequests(this.node.idString()));
            Assertions.assertTrue(poll.isEmpty());
            long round = Math.round(Math.pow(2.0d, Math.min(i, log)) * 10000.0d);
            long connectionDelay = this.client.connectionDelay(this.node, this.time.milliseconds());
            Assertions.assertEquals(round, connectionDelay, 0.2d * round);
            if (i == 4) {
                return;
            }
            this.time.sleep(connectionDelay + 1);
        }
    }

    @Test
    public void testClientDisconnectAfterInternalApiVersionRequest() throws Exception {
        awaitInFlightApiVersionRequest();
        this.client.disconnect(this.node.idString());
        Assertions.assertFalse(this.client.hasInFlightRequests(this.node.idString()));
        Assertions.assertTrue(this.client.poll(0L, this.time.milliseconds()).isEmpty());
    }

    @Test
    public void testDisconnectWithMultipleInFlights() {
        NetworkClient networkClient = this.clientWithNoVersionDiscovery;
        awaitReady(networkClient, this.node);
        Assertions.assertTrue(networkClient.isReady(this.node, this.time.milliseconds()), "Expected NetworkClient to be ready to send to node " + this.node.idString());
        MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.emptyList(), true);
        long milliseconds = this.time.milliseconds();
        ArrayList arrayList = new ArrayList();
        arrayList.getClass();
        RequestCompletionHandler requestCompletionHandler = (v1) -> {
            r0.add(v1);
        };
        ClientRequest newClientRequest = networkClient.newClientRequest(this.node.idString(), builder, milliseconds, true, 1000, requestCompletionHandler);
        networkClient.send(newClientRequest, milliseconds);
        networkClient.poll(0L, milliseconds);
        ClientRequest newClientRequest2 = networkClient.newClientRequest(this.node.idString(), builder, milliseconds, true, 1000, requestCompletionHandler);
        networkClient.send(newClientRequest2, milliseconds);
        networkClient.poll(0L, milliseconds);
        Assertions.assertNotEquals(newClientRequest.correlationId(), newClientRequest2.correlationId());
        Assertions.assertEquals(2, networkClient.inFlightRequestCount());
        Assertions.assertEquals(2, networkClient.inFlightRequestCount(this.node.idString()));
        networkClient.disconnect(this.node.idString());
        List poll = networkClient.poll(0L, this.time.milliseconds());
        Assertions.assertEquals(2, poll.size());
        Assertions.assertEquals(poll, arrayList);
        Assertions.assertEquals(0, networkClient.inFlightRequestCount());
        Assertions.assertEquals(0, networkClient.inFlightRequestCount(this.node.idString()));
        ClientResponse clientResponse = (ClientResponse) poll.get(0);
        Assertions.assertTrue(clientResponse.wasDisconnected());
        Assertions.assertEquals(newClientRequest.correlationId(), clientResponse.requestHeader().correlationId());
        ClientResponse clientResponse2 = (ClientResponse) poll.get(1);
        Assertions.assertTrue(clientResponse2.wasDisconnected());
        Assertions.assertEquals(newClientRequest2.correlationId(), clientResponse2.requestHeader().correlationId());
    }

    @Test
    public void testCallDisconnect() throws Exception {
        awaitReady(this.client, this.node);
        Assertions.assertTrue(this.client.isReady(this.node, this.time.milliseconds()), "Expected NetworkClient to be ready to send to node " + this.node.idString());
        Assertions.assertFalse(this.client.connectionFailed(this.node), "Did not expect connection to node " + this.node.idString() + " to be failed");
        this.client.disconnect(this.node.idString());
        Assertions.assertFalse(this.client.isReady(this.node, this.time.milliseconds()), "Expected node " + this.node.idString() + " to be disconnected.");
        Assertions.assertTrue(this.client.connectionFailed(this.node), "Expected connection to node " + this.node.idString() + " to be failed after disconnect");
        Assertions.assertFalse(this.client.canConnect(this.node, this.time.milliseconds()));
        this.time.sleep(100000L);
        Assertions.assertTrue(this.client.canConnect(this.node, this.time.milliseconds()));
        this.client.disconnect(this.node.idString());
        Assertions.assertTrue(this.client.canConnect(this.node, this.time.milliseconds()));
    }

    @Test
    public void testCorrelationId() {
        Set set = (Set) IntStream.range(0, 100).mapToObj(i -> {
            return Integer.valueOf(this.client.nextCorrelationId());
        }).collect(Collectors.toSet());
        Assertions.assertEquals(100, set.size());
        set.forEach(num -> {
            Assertions.assertTrue(num.intValue() < 2147483640);
        });
    }

    @Test
    public void testReconnectAfterAddressChange() {
        AddressChangeHostResolver addressChangeHostResolver = new AddressChangeHostResolver((InetAddress[]) initialAddresses.toArray(new InetAddress[0]), (InetAddress[]) newAddresses.toArray(new InetAddress[0]));
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        MockSelector mockSelector = new MockSelector(this.time, inetSocketAddress -> {
            InetAddress address = inetSocketAddress.getAddress();
            if (initialAddresses.contains(address)) {
                atomicInteger.incrementAndGet();
            } else if (newAddresses.contains(address)) {
                atomicInteger2.incrementAndGet();
            }
            return (addressChangeHostResolver.useNewAddresses() && newAddresses.contains(address)) || (!addressChangeHostResolver.useNewAddresses() && initialAddresses.contains(address));
        });
        NetworkClient networkClient = new NetworkClient(this.metadataUpdater, (Metadata) null, mockSelector, "mock", Integer.MAX_VALUE, 10000L, 100000L, 65536, 65536, 1000, 5000L, 127000L, this.time, false, new ApiVersions(), (Sensor) null, new LogContext(), addressChangeHostResolver);
        networkClient.ready(this.node, this.time.milliseconds());
        this.time.sleep(127000L);
        networkClient.poll(0L, this.time.milliseconds());
        Assertions.assertTrue(networkClient.isReady(this.node, this.time.milliseconds()));
        addressChangeHostResolver.changeAddresses();
        mockSelector.serverDisconnect(this.node.idString());
        networkClient.poll(0L, this.time.milliseconds());
        Assertions.assertFalse(networkClient.isReady(this.node, this.time.milliseconds()));
        this.time.sleep(100000L);
        networkClient.ready(this.node, this.time.milliseconds());
        this.time.sleep(127000L);
        networkClient.poll(0L, this.time.milliseconds());
        Assertions.assertTrue(networkClient.isReady(this.node, this.time.milliseconds()));
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(1, atomicInteger2.get());
        Assertions.assertEquals(2, addressChangeHostResolver.resolutionCount());
    }

    @Test
    public void testFailedConnectionToFirstAddress() {
        AddressChangeHostResolver addressChangeHostResolver = new AddressChangeHostResolver((InetAddress[]) initialAddresses.toArray(new InetAddress[0]), (InetAddress[]) newAddresses.toArray(new InetAddress[0]));
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        NetworkClient networkClient = new NetworkClient(this.metadataUpdater, (Metadata) null, new MockSelector(this.time, inetSocketAddress -> {
            InetAddress address = inetSocketAddress.getAddress();
            if (initialAddresses.contains(address)) {
                atomicInteger.incrementAndGet();
            } else if (newAddresses.contains(address)) {
                atomicInteger2.incrementAndGet();
            }
            return atomicInteger.get() > 1;
        }), "mock", Integer.MAX_VALUE, 10000L, 100000L, 65536, 65536, 1000, 5000L, 127000L, this.time, false, new ApiVersions(), (Sensor) null, new LogContext(), addressChangeHostResolver);
        networkClient.ready(this.node, this.time.milliseconds());
        this.time.sleep(127000L);
        networkClient.poll(0L, this.time.milliseconds());
        Assertions.assertFalse(networkClient.isReady(this.node, this.time.milliseconds()));
        this.time.sleep(100000L);
        networkClient.ready(this.node, this.time.milliseconds());
        this.time.sleep(127000L);
        networkClient.poll(0L, this.time.milliseconds());
        Assertions.assertTrue(networkClient.isReady(this.node, this.time.milliseconds()));
        Assertions.assertEquals(2, atomicInteger.get());
        Assertions.assertEquals(0, atomicInteger2.get());
        Assertions.assertEquals(1, addressChangeHostResolver.resolutionCount());
    }

    @Test
    public void testFailedConnectionToFirstAddressAfterReconnect() {
        AddressChangeHostResolver addressChangeHostResolver = new AddressChangeHostResolver((InetAddress[]) initialAddresses.toArray(new InetAddress[0]), (InetAddress[]) newAddresses.toArray(new InetAddress[0]));
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        MockSelector mockSelector = new MockSelector(this.time, inetSocketAddress -> {
            InetAddress address = inetSocketAddress.getAddress();
            if (initialAddresses.contains(address)) {
                atomicInteger.incrementAndGet();
            } else if (newAddresses.contains(address)) {
                atomicInteger2.incrementAndGet();
            }
            return initialAddresses.contains(address) || atomicInteger2.get() > 1;
        });
        NetworkClient networkClient = new NetworkClient(this.metadataUpdater, (Metadata) null, mockSelector, "mock", Integer.MAX_VALUE, 10000L, 100000L, 65536, 65536, 1000, 5000L, 127000L, this.time, false, new ApiVersions(), (Sensor) null, new LogContext(), addressChangeHostResolver);
        networkClient.ready(this.node, this.time.milliseconds());
        this.time.sleep(127000L);
        networkClient.poll(0L, this.time.milliseconds());
        Assertions.assertTrue(networkClient.isReady(this.node, this.time.milliseconds()));
        addressChangeHostResolver.changeAddresses();
        mockSelector.serverDisconnect(this.node.idString());
        networkClient.poll(0L, this.time.milliseconds());
        Assertions.assertFalse(networkClient.isReady(this.node, this.time.milliseconds()));
        this.time.sleep(100000L);
        networkClient.ready(this.node, this.time.milliseconds());
        this.time.sleep(127000L);
        networkClient.poll(0L, this.time.milliseconds());
        Assertions.assertFalse(networkClient.isReady(this.node, this.time.milliseconds()));
        this.time.sleep(100000L);
        networkClient.ready(this.node, this.time.milliseconds());
        this.time.sleep(127000L);
        networkClient.poll(0L, this.time.milliseconds());
        Assertions.assertTrue(networkClient.isReady(this.node, this.time.milliseconds()));
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(2, atomicInteger2.get());
        Assertions.assertEquals(2, addressChangeHostResolver.resolutionCount());
    }

    @Test
    public void testCloseConnectingNode() {
        Cluster clusterWith = TestUtils.clusterWith(2);
        Node nodeById = clusterWith.nodeById(0);
        Node nodeById2 = clusterWith.nodeById(1);
        this.client.ready(nodeById, this.time.milliseconds());
        this.selector.serverConnectionBlocked(nodeById.idString());
        this.client.poll(1L, this.time.milliseconds());
        this.client.close(nodeById.idString());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse(NetworkClientUtils.isReady(this.client, nodeById, this.time.milliseconds()));
        Assertions.assertFalse(NetworkClientUtils.isReady(this.client, nodeById2, this.time.milliseconds()));
        this.client.ready(nodeById2, this.time.milliseconds());
        this.selector.delayedReceive(new DelayedReceive(nodeById2.idString(), new NetworkReceive(nodeById2.idString(), RequestTestUtils.serializeResponseWithHeader(defaultApiVersionsResponse(), ApiKeys.API_VERSIONS.latestVersion(), 0))));
        while (!this.client.ready(nodeById2, this.time.milliseconds())) {
            this.client.poll(1L, this.time.milliseconds());
        }
        Assertions.assertTrue(this.client.isReady(nodeById2, this.time.milliseconds()));
        this.selector.clear();
        this.client.ready(nodeById, this.time.milliseconds());
        this.selector.delayedReceive(new DelayedReceive(nodeById.idString(), new NetworkReceive(nodeById.idString(), RequestTestUtils.serializeResponseWithHeader(defaultApiVersionsResponse(), ApiKeys.API_VERSIONS.latestVersion(), 1))));
        while (!this.client.ready(nodeById, this.time.milliseconds())) {
            this.client.poll(1L, this.time.milliseconds());
        }
        Assertions.assertTrue(this.client.isReady(nodeById, this.time.milliseconds()));
    }

    @Test
    public void testConnectionDoesNotRemainStuckInCheckingApiVersionsStateIfChannelNeverBecomesReady() {
        Node nodeById = TestUtils.clusterWith(1).nodeById(0);
        this.client.ready(nodeById, this.time.milliseconds());
        this.selector.channelNotReady(nodeById.idString());
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertFalse(NetworkClientUtils.isReady(this.client, nodeById, this.time.milliseconds()));
        this.time.sleep(6001L);
        this.client.poll(0L, this.time.milliseconds());
        Assertions.assertTrue(this.client.connectionFailed(nodeById));
    }

    private RequestHeader parseHeader(ByteBuffer byteBuffer) {
        byteBuffer.getInt();
        return RequestHeader.parse(byteBuffer.slice());
    }

    private void awaitInFlightApiVersionRequest() throws Exception {
        this.client.ready(this.node, this.time.milliseconds());
        TestUtils.waitForCondition(() -> {
            this.client.poll(0L, this.time.milliseconds());
            return this.client.hasInFlightRequests(this.node.idString());
        }, 1000L, "");
        Assertions.assertFalse(this.client.isReady(this.node, this.time.milliseconds()));
    }

    private ApiVersionsResponse defaultApiVersionsResponse() {
        return ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
    }

    static {
        try {
            initialAddresses = new ArrayList<>(Arrays.asList(InetAddress.getByName("10.200.20.100"), InetAddress.getByName("10.200.20.101"), InetAddress.getByName("10.200.20.102")));
            newAddresses = new ArrayList<>(Arrays.asList(InetAddress.getByName("10.200.20.103"), InetAddress.getByName("10.200.20.104"), InetAddress.getByName("10.200.20.105")));
        } catch (UnknownHostException e) {
            Assertions.fail("Attempted to create an invalid InetAddress, this should not happen");
        }
    }
}
