package org.apache.kafka.clients;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/clients/NetworkClientTest.class */
public class NetworkClientTest {
    protected final int requestTimeoutMs = 1000;
    protected final MockTime time = new MockTime();
    protected final MockSelector selector = new MockSelector(this.time);
    protected final Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
    protected final int nodeId = 1;
    protected final Cluster cluster = TestUtils.singletonCluster("test", 1);
    protected final Node node = (Node) this.cluster.nodes().get(0);
    protected final long reconnectBackoffMsTest = 10000;
    protected final long reconnectBackoffMaxMsTest = 100000;
    private final NetworkClient client = createNetworkClient(100000);
    private final NetworkClient clientWithNoExponentialBackoff = createNetworkClient(10000);
    private final NetworkClient clientWithStaticNodes = createNetworkClientWithStaticNodes();
    private final NetworkClient clientWithNoVersionDiscovery = createNetworkClientWithNoVersionDiscovery();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/NetworkClientTest$TestCallbackHandler.class */
    public static class TestCallbackHandler implements RequestCompletionHandler {
        public boolean executed;
        public ClientResponse response;

        private TestCallbackHandler() {
            this.executed = false;
        }

        public void onComplete(ClientResponse clientResponse) {
            this.executed = true;
            this.response = clientResponse;
        }
    }

    private NetworkClient createNetworkClient(long j) {
        return new NetworkClient(this.selector, this.metadata, "mock", Integer.MAX_VALUE, 10000L, j, 65536, 65536, 1000, this.time, true, new ApiVersions(), new LogContext());
    }

    private NetworkClient createNetworkClientWithStaticNodes() {
        return new NetworkClient(this.selector, new ManualMetadataUpdater(Arrays.asList(this.node)), "mock-static", Integer.MAX_VALUE, 0L, 0L, 65536, 65536, 1000, this.time, true, new ApiVersions(), new LogContext());
    }

    private NetworkClient createNetworkClientWithNoVersionDiscovery() {
        return new NetworkClient(this.selector, this.metadata, "mock", Integer.MAX_VALUE, 10000L, 100000L, 65536, 65536, 1000, this.time, false, new ApiVersions(), new LogContext());
    }

    @Before
    public void setup() {
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
    }

    @Test(expected = IllegalStateException.class)
    public void testSendToUnreadyNode() {
        MetadataRequest.Builder builder = new MetadataRequest.Builder(Arrays.asList("test"), true);
        long milliseconds = this.time.milliseconds();
        this.client.send(this.client.newClientRequest("5", builder, milliseconds, false), milliseconds);
        this.client.poll(1L, this.time.milliseconds());
    }

    @Test
    public void testSimpleRequestResponse() {
        checkSimpleRequestResponse(this.client);
    }

    @Test
    public void testSimpleRequestResponseWithStaticNodes() {
        checkSimpleRequestResponse(this.clientWithStaticNodes);
    }

    @Test
    public void testSimpleRequestResponseWithNoBrokerDiscovery() {
        checkSimpleRequestResponse(this.clientWithNoVersionDiscovery);
    }

    @Test
    public void testClose() {
        this.client.ready(this.node, this.time.milliseconds());
        awaitReady(this.client, this.node);
        this.client.poll(1L, this.time.milliseconds());
        Assert.assertTrue("The client should be ready", this.client.isReady(this.node, this.time.milliseconds()));
        this.client.send(this.client.newClientRequest(this.node.idString(), ProduceRequest.Builder.forCurrentMagic((short) 1, 1000, Collections.emptyMap()), this.time.milliseconds(), true), this.time.milliseconds());
        Assert.assertEquals("There should be 1 in-flight request after send", 1L, this.client.inFlightRequestCount(this.node.idString()));
        Assert.assertTrue(this.client.hasInFlightRequests(this.node.idString()));
        Assert.assertTrue(this.client.hasInFlightRequests());
        this.client.close(this.node.idString());
        Assert.assertEquals("There should be no in-flight request after close", 0L, this.client.inFlightRequestCount(this.node.idString()));
        Assert.assertFalse(this.client.hasInFlightRequests(this.node.idString()));
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertFalse("Connection should not be ready after close", this.client.isReady(this.node, 0L));
    }

    private void checkSimpleRequestResponse(NetworkClient networkClient) {
        awaitReady(networkClient, this.node);
        ProduceRequest.Builder forCurrentMagic = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000, Collections.emptyMap());
        TestCallbackHandler testCallbackHandler = new TestCallbackHandler();
        ClientRequest newClientRequest = networkClient.newClientRequest(this.node.idString(), forCurrentMagic, this.time.milliseconds(), true, testCallbackHandler);
        networkClient.send(newClientRequest, this.time.milliseconds());
        networkClient.poll(1L, this.time.milliseconds());
        Assert.assertEquals(1L, networkClient.inFlightRequestCount());
        ResponseHeader responseHeader = new ResponseHeader(newClientRequest.correlationId());
        Struct struct = new Struct(ApiKeys.PRODUCE.responseSchema(ApiKeys.PRODUCE.latestVersion()));
        struct.set("responses", new Object[0]);
        Struct struct2 = responseHeader.toStruct();
        ByteBuffer allocate = ByteBuffer.allocate(struct2.sizeOf() + struct.sizeOf());
        struct2.writeTo(allocate);
        struct.writeTo(allocate);
        allocate.flip();
        this.selector.completeReceive(new NetworkReceive(this.node.idString(), allocate));
        Assert.assertEquals(1L, networkClient.poll(1L, this.time.milliseconds()).size());
        Assert.assertTrue("The handler should have executed.", testCallbackHandler.executed);
        Assert.assertTrue("Should have a response body.", testCallbackHandler.response.hasResponse());
        Assert.assertEquals("Should be correlated to the original request", newClientRequest.correlationId(), testCallbackHandler.response.requestHeader().correlationId());
    }

    private void setExpectedApiVersionsResponse() {
        ApiVersionsResponse defaultApiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse();
        this.selector.delayedReceive(new DelayedReceive(this.node.idString(), new NetworkReceive(this.node.idString(), defaultApiVersionsResponse.serialize(defaultApiVersionsResponse.apiVersion(ApiKeys.API_VERSIONS.id).maxVersion, new ResponseHeader(0)))));
    }

    private void awaitReady(NetworkClient networkClient, Node node) {
        if (networkClient.discoverBrokerVersions()) {
            setExpectedApiVersionsResponse();
        }
        while (!networkClient.ready(node, this.time.milliseconds())) {
            networkClient.poll(1L, this.time.milliseconds());
        }
        this.selector.clear();
    }

    @Test
    public void testRequestTimeout() {
        awaitReady(this.client, this.node);
        ProduceRequest.Builder forCurrentMagic = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000, Collections.emptyMap());
        TestCallbackHandler testCallbackHandler = new TestCallbackHandler();
        long milliseconds = this.time.milliseconds();
        this.client.send(this.client.newClientRequest(this.node.idString(), forCurrentMagic, milliseconds, true, testCallbackHandler), milliseconds);
        this.time.sleep(3000L);
        List poll = this.client.poll(3000L, this.time.milliseconds());
        Assert.assertEquals(1L, poll.size());
        ClientResponse clientResponse = (ClientResponse) poll.get(0);
        Assert.assertEquals(this.node.idString(), clientResponse.destination());
        Assert.assertTrue("Expected response to fail due to disconnection", clientResponse.wasDisconnected());
    }

    @Test
    public void testLeastLoadedNode() {
        this.client.ready(this.node, this.time.milliseconds());
        awaitReady(this.client, this.node);
        this.client.poll(1L, this.time.milliseconds());
        Assert.assertTrue("The client should be ready", this.client.isReady(this.node, this.time.milliseconds()));
        Assert.assertEquals("There should be one leastloadednode", this.client.leastLoadedNode(this.time.milliseconds()).id(), this.node.id());
        this.time.sleep(10000L);
        this.selector.serverDisconnect(this.node.idString());
        this.client.poll(1L, this.time.milliseconds());
        Assert.assertFalse("After we forced the disconnection the client is no longer ready.", this.client.ready(this.node, this.time.milliseconds()));
        Assert.assertEquals("There should be NO leastloadednode", this.client.leastLoadedNode(this.time.milliseconds()), (Object) null);
    }

    @Test
    public void testConnectionDelayWithNoExponentialBackoff() {
        Assert.assertEquals(0L, this.clientWithNoExponentialBackoff.connectionDelay(this.node, this.time.milliseconds()));
    }

    @Test
    public void testConnectionDelayConnectedWithNoExponentialBackoff() {
        awaitReady(this.clientWithNoExponentialBackoff, this.node);
        Assert.assertEquals(Long.MAX_VALUE, this.clientWithNoExponentialBackoff.connectionDelay(this.node, this.time.milliseconds()));
    }

    @Test
    public void testConnectionDelayDisconnectedWithNoExponentialBackoff() {
        awaitReady(this.clientWithNoExponentialBackoff, this.node);
        this.selector.serverDisconnect(this.node.idString());
        this.clientWithNoExponentialBackoff.poll(1000L, this.time.milliseconds());
        long connectionDelay = this.clientWithNoExponentialBackoff.connectionDelay(this.node, this.time.milliseconds());
        Assert.assertEquals(10000L, connectionDelay);
        this.time.sleep(connectionDelay);
        Assert.assertEquals(0L, this.clientWithNoExponentialBackoff.connectionDelay(this.node, this.time.milliseconds()));
        this.client.ready(this.node, this.time.milliseconds());
        this.selector.serverDisconnect(this.node.idString());
        this.client.poll(1000L, this.time.milliseconds());
        Assert.assertEquals(10000L, connectionDelay);
    }

    @Test
    public void testConnectionDelay() {
        Assert.assertEquals(0L, this.client.connectionDelay(this.node, this.time.milliseconds()));
    }

    @Test
    public void testConnectionDelayConnected() {
        awaitReady(this.client, this.node);
        Assert.assertEquals(Long.MAX_VALUE, this.client.connectionDelay(this.node, this.time.milliseconds()));
    }

    @Test
    public void testConnectionDelayDisconnected() {
        awaitReady(this.client, this.node);
        this.selector.serverDisconnect(this.node.idString());
        this.client.poll(1000L, this.time.milliseconds());
        long connectionDelay = this.client.connectionDelay(this.node, this.time.milliseconds());
        Assert.assertEquals(10000L, connectionDelay, 10000 * 0.2d);
        this.time.sleep(connectionDelay);
        Assert.assertEquals(0L, this.client.connectionDelay(this.node, this.time.milliseconds()));
        this.client.ready(this.node, this.time.milliseconds());
        this.selector.serverDisconnect(this.node.idString());
        this.client.poll(1000L, this.time.milliseconds());
        long round = Math.round((float) (connectionDelay * 2));
        Assert.assertEquals(round, this.client.connectionDelay(this.node, this.time.milliseconds()), round * 0.4d);
    }

    @Test
    public void testDisconnectDuringUserMetadataRequest() {
        awaitReady(this.client, this.node);
        MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.emptyList(), true);
        long milliseconds = this.time.milliseconds();
        this.client.send(this.client.newClientRequest(this.node.idString(), builder, milliseconds, true), milliseconds);
        this.client.poll(1000L, milliseconds);
        Assert.assertEquals(1L, this.client.inFlightRequestCount(this.node.idString()));
        Assert.assertTrue(this.client.hasInFlightRequests(this.node.idString()));
        Assert.assertTrue(this.client.hasInFlightRequests());
        this.selector.close(this.node.idString());
        List poll = this.client.poll(1000L, this.time.milliseconds());
        Assert.assertEquals(1L, poll.size());
        Assert.assertTrue(((ClientResponse) poll.iterator().next()).wasDisconnected());
    }

    @Test
    public void testServerDisconnectAfterInternalApiVersionRequest() throws Exception {
        awaitInFlightApiVersionRequest();
        this.selector.serverDisconnect(this.node.idString());
        List poll = this.client.poll(0L, this.time.milliseconds());
        Assert.assertFalse(this.client.hasInFlightRequests(this.node.idString()));
        Assert.assertTrue(poll.isEmpty());
    }

    @Test
    public void testClientDisconnectAfterInternalApiVersionRequest() throws Exception {
        awaitInFlightApiVersionRequest();
        this.client.disconnect(this.node.idString());
        Assert.assertFalse(this.client.hasInFlightRequests(this.node.idString()));
        Assert.assertTrue(this.client.poll(0L, this.time.milliseconds()).isEmpty());
    }

    @Test
    public void testCallDisconnect() throws Exception {
        awaitReady(this.client, this.node);
        Assert.assertTrue("Expected NetworkClient to be ready to send to node " + this.node.idString(), this.client.isReady(this.node, this.time.milliseconds()));
        Assert.assertFalse("Did not expect connection to node " + this.node.idString() + " to be failed", this.client.connectionFailed(this.node));
        this.client.disconnect(this.node.idString());
        Assert.assertFalse("Expected node " + this.node.idString() + " to be disconnected.", this.client.isReady(this.node, this.time.milliseconds()));
        Assert.assertTrue("Expected connection to node " + this.node.idString() + " to be failed after disconnect", this.client.connectionFailed(this.node));
        Assert.assertFalse(this.client.canConnect(this.node, this.time.milliseconds()));
        this.time.sleep(100000L);
        Assert.assertTrue(this.client.canConnect(this.node, this.time.milliseconds()));
        this.client.disconnect(this.node.idString());
        Assert.assertTrue(this.client.canConnect(this.node, this.time.milliseconds()));
    }

    private void awaitInFlightApiVersionRequest() throws Exception {
        this.client.ready(this.node, this.time.milliseconds());
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.clients.NetworkClientTest.1
            @Override // org.apache.kafka.test.TestCondition
            public boolean conditionMet() {
                NetworkClientTest.this.client.poll(0L, NetworkClientTest.this.time.milliseconds());
                return NetworkClientTest.this.client.hasInFlightRequests(NetworkClientTest.this.node.idString());
            }
        }, 1000L, "");
        Assert.assertFalse(this.client.isReady(this.node, this.time.milliseconds()));
    }
}
