package org.apache.kafka.clients;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/clients/NetworkClientTest.class */
public class NetworkClientTest {
    private MockTime time = new MockTime();
    private MockSelector selector = new MockSelector(this.time);
    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
    private int nodeId = 1;
    private Cluster cluster = TestUtils.singletonCluster("test", this.nodeId);
    private Node node = (Node) this.cluster.nodes().get(0);
    private NetworkClient client = new NetworkClient(this.selector, this.metadata, "mock", Integer.MAX_VALUE, 0, 65536, 65536);

    @Before
    public void setup() {
        this.metadata.update(this.cluster, this.time.milliseconds());
    }

    @Test
    public void testReadyAndDisconnect() {
        ArrayList arrayList = new ArrayList();
        Assert.assertFalse("Client begins unready as it has no connection.", this.client.ready(this.node, this.time.milliseconds()));
        Assert.assertEquals("The connection is established as a side-effect of the readiness check", 1L, this.selector.connected().size());
        this.client.poll(arrayList, 1L, this.time.milliseconds());
        this.selector.clear();
        Assert.assertTrue("Now the client is ready", this.client.ready(this.node, this.time.milliseconds()));
        this.selector.disconnect(this.node.id());
        this.client.poll(arrayList, 1L, this.time.milliseconds());
        this.selector.clear();
        Assert.assertFalse("After we forced the disconnection the client is no longer ready.", this.client.ready(this.node, this.time.milliseconds()));
        Assert.assertTrue("Metadata should get updated.", this.metadata.timeToNextUpdate(this.time.milliseconds()) == 0);
    }

    @Test(expected = IllegalStateException.class)
    public void testSendToUnreadyNode() {
        this.client.poll(Arrays.asList(new ClientRequest(this.time.milliseconds(), false, new RequestSend(5, this.client.nextRequestHeader(ApiKeys.METADATA), new MetadataRequest(Arrays.asList("test")).toStruct()), (Object) null)), 1L, this.time.milliseconds());
    }

    @Test
    public void testSimpleRequestResponse() {
        ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.emptyMap());
        RequestHeader nextRequestHeader = this.client.nextRequestHeader(ApiKeys.PRODUCE);
        ClientRequest clientRequest = new ClientRequest(this.time.milliseconds(), true, new RequestSend(this.node.id(), nextRequestHeader, produceRequest.toStruct()), (Object) null);
        awaitReady(this.client, this.node);
        this.client.poll(Arrays.asList(clientRequest), 1L, this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        ResponseHeader responseHeader = new ResponseHeader(nextRequestHeader.correlationId());
        Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
        struct.set("responses", new Object[0]);
        ByteBuffer allocate = ByteBuffer.allocate(responseHeader.sizeOf() + struct.sizeOf());
        responseHeader.writeTo(allocate);
        struct.writeTo(allocate);
        allocate.flip();
        this.selector.completeReceive(new NetworkReceive(this.node.id(), allocate));
        List poll = this.client.poll(new ArrayList(), 1L, this.time.milliseconds());
        Assert.assertEquals(1L, poll.size());
        ClientResponse clientResponse = (ClientResponse) poll.get(0);
        Assert.assertTrue("Should have a response body.", clientResponse.hasResponse());
        Assert.assertEquals("Should be correlated to the original request", clientRequest, clientResponse.request());
    }

    private void awaitReady(NetworkClient networkClient, Node node) {
        while (!networkClient.ready(node, this.time.milliseconds())) {
            networkClient.poll(new ArrayList(), 1L, this.time.milliseconds());
        }
    }
}
