package org.apache.kafka.common.network;

import io.confluent.kafka.multitenant.InetAddressToTenantContext;
import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.TenantMetadata;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.Random;
import javax.net.ssl.SNIHostName;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.security.authenticator.PathAwareSniHostName;
import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.audit.AuditEventStatus;
import org.apache.kafka.server.audit.AuthenticationErrorInfo;
import org.apache.kafka.server.interceptor.DefaultBrokerInterceptor;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/common/network/KafkaChannelTest.class */
public class KafkaChannelTest {
    public static final ChannelMetadataRegistry METADATA_REGISTRY = (ChannelMetadataRegistry) Mockito.mock(ChannelMetadataRegistry.class);
    public static final MemoryPool POOL = (MemoryPool) Mockito.mock(MemoryPool.class);
    public static final TransportLayer TRANSPORT = (TransportLayer) Mockito.mock(TransportLayer.class);
    public static final Authenticator AUTHENTICATOR = (Authenticator) Mockito.mock(Authenticator.class);
    public static final SslTransportLayer SSL_TRANSPORT_LAYER = (SslTransportLayer) Mockito.mock(SslTransportLayer.class);
    public static final SaslServerAuthenticator SASL_SERVER_AUTHENTICATOR = (SaslServerAuthenticator) Mockito.mock(SaslServerAuthenticator.class);
    public static final Time MOCK_TIME = new MockTime();
    public static final String CHANNEL_ID = "0";
    public static final int MAX_RECEIVE_SIZE = 1024;
    public static final KafkaChannel KAFKA_CHANNEL = new KafkaChannel(CHANNEL_ID, TRANSPORT, () -> {
        return AUTHENTICATOR;
    }, MAX_RECEIVE_SIZE, new MockAsyncAuthExecutor(), POOL, METADATA_REGISTRY, new DefaultBrokerInterceptor(), MOCK_TIME, false, false);
    public static final SNIHostName RAW_SNI_HOST_NAME = new SNIHostName("hostName");
    public static final PathAwareSniHostName SNI_HOST_NAME = new PathAwareSniHostName("hostName");
    public static final SNIHostName SNI_HOST_NAME_WITH_LKC = new SNIHostName("lkc-2v531-00aa-usw1-az1-lg1y3.us-west-1.aws.confluent.cloud");
    public static final InetAddress LOOPBACK_SOCKET_ADDRESS = InetAddress.getLoopbackAddress();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/common/network/KafkaChannelTest$ChannelReadinessMock.class */
    public class ChannelReadinessMock {
        Duration registrationTime;
        Duration handshakeTime;
        Duration authenticationTime;
        boolean shouldHandleProxy;
        boolean shouldHandshake;
        boolean shouldAuthenticate;
        boolean proxyHandlingCompleted;
        boolean handshakeCompleted;
        boolean authenticationCompleted;

        public ChannelReadinessMock() {
            this.registrationTime = KafkaChannelTest.this.randomTimeDuration();
            this.handshakeTime = KafkaChannelTest.this.randomTimeDuration();
            this.authenticationTime = KafkaChannelTest.this.randomTimeDuration();
        }
    }

    @AfterEach
    public void tearDown() {
        Mockito.reset(new Object[]{TRANSPORT, AUTHENTICATOR, POOL, METADATA_REGISTRY, SSL_TRANSPORT_LAYER, SASL_SERVER_AUTHENTICATOR});
    }

    @Test
    public void testPrepareSetSniHostName() throws IOException {
        Mockito.when(Boolean.valueOf(SSL_TRANSPORT_LAYER.proxyProtocolReady())).thenReturn(true);
        Mockito.when(Boolean.valueOf(SSL_TRANSPORT_LAYER.ready())).thenReturn(true);
        Mockito.when(SSL_TRANSPORT_LAYER.sniHostName()).thenReturn(Optional.of(RAW_SNI_HOST_NAME));
        Mockito.when(Boolean.valueOf(SASL_SERVER_AUTHENTICATOR.complete())).thenReturn(false).thenReturn(true);
        KafkaChannel kafkaChannel = new KafkaChannel(CHANNEL_ID, SSL_TRANSPORT_LAYER, () -> {
            return SASL_SERVER_AUTHENTICATOR;
        }, MAX_RECEIVE_SIZE, new MockAsyncAuthExecutor(), POOL, METADATA_REGISTRY, new DefaultBrokerInterceptor(), MOCK_TIME, true, false, (String) null);
        kafkaChannel.prepare();
        ((SslTransportLayer) Mockito.verify(SSL_TRANSPORT_LAYER, Mockito.times(3))).ready();
        ((SslTransportLayer) Mockito.verify(SSL_TRANSPORT_LAYER)).sniHostName();
        ((SaslServerAuthenticator) Mockito.verify(SASL_SERVER_AUTHENTICATOR)).setSniHostName(SNI_HOST_NAME);
        ((SaslServerAuthenticator) Mockito.verify(SASL_SERVER_AUTHENTICATOR)).authenticate();
        Assertions.assertEquals(1, kafkaChannel.metrics().successfulAuthentications());
    }

    @Test
    public void testPrepareSetLkcId() throws IOException {
        Mockito.when(Boolean.valueOf(SSL_TRANSPORT_LAYER.proxyProtocolReady())).thenReturn(true);
        Mockito.when(Boolean.valueOf(SSL_TRANSPORT_LAYER.ready())).thenReturn(true);
        Mockito.when(SSL_TRANSPORT_LAYER.lkcId()).thenReturn(Optional.of("lkc-id"));
        Mockito.when(Boolean.valueOf(SASL_SERVER_AUTHENTICATOR.complete())).thenReturn(false).thenReturn(true);
        KafkaChannel kafkaChannel = new KafkaChannel(CHANNEL_ID, SSL_TRANSPORT_LAYER, () -> {
            return SASL_SERVER_AUTHENTICATOR;
        }, MAX_RECEIVE_SIZE, new MockAsyncAuthExecutor(), POOL, METADATA_REGISTRY, new DefaultBrokerInterceptor(), MOCK_TIME, false, true, (String) null);
        kafkaChannel.prepare();
        ((SslTransportLayer) Mockito.verify(SSL_TRANSPORT_LAYER, Mockito.times(3))).ready();
        ((SslTransportLayer) Mockito.verify(SSL_TRANSPORT_LAYER)).lkcId();
        ((SaslServerAuthenticator) Mockito.verify(SASL_SERVER_AUTHENTICATOR)).putSaslServerPropertyIfAbsent("__confluent_logical_cluster_id", "lkc-id");
        ((SaslServerAuthenticator) Mockito.verify(SASL_SERVER_AUTHENTICATOR)).authenticate();
        Assertions.assertEquals(1, kafkaChannel.metrics().successfulAuthentications());
    }

    @Test
    public void testChannelLkcIdFromProxyProtocol() throws IOException {
        Mockito.when(Boolean.valueOf(SSL_TRANSPORT_LAYER.proxyProtocolReady())).thenReturn(false).thenReturn(true);
        Mockito.when(Boolean.valueOf(SSL_TRANSPORT_LAYER.ready())).thenReturn(true);
        Mockito.when(SSL_TRANSPORT_LAYER.lkcId()).thenReturn(Optional.of("lkc-proxy"));
        Mockito.when(Boolean.valueOf(SASL_SERVER_AUTHENTICATOR.complete())).thenReturn(false).thenReturn(true);
        KafkaChannel kafkaChannel = new KafkaChannel(CHANNEL_ID, SSL_TRANSPORT_LAYER, () -> {
            return SASL_SERVER_AUTHENTICATOR;
        }, MAX_RECEIVE_SIZE, new MockAsyncAuthExecutor(), POOL, METADATA_REGISTRY, new DefaultBrokerInterceptor(), MOCK_TIME, false, true, (String) null);
        kafkaChannel.prepare();
        Assertions.assertEquals(KafkaChannel.ChannelLkcState.LKC_READY, kafkaChannel.lkcState());
        Assertions.assertEquals(KafkaChannel.ChannelProxyState.PROXY_READY, kafkaChannel.proxyState());
        Assertions.assertEquals("lkc-proxy", kafkaChannel.lkcId());
    }

    @Test
    public void testChannelLkcIdFromSniHeader() throws IOException {
        Mockito.when(Boolean.valueOf(SSL_TRANSPORT_LAYER.proxyProtocolReady())).thenReturn(true);
        Mockito.when(Boolean.valueOf(SSL_TRANSPORT_LAYER.ready())).thenReturn(true);
        Mockito.when(SSL_TRANSPORT_LAYER.lkcId()).thenReturn(Optional.of("lkc-proxy"));
        Mockito.when(Boolean.valueOf(SASL_SERVER_AUTHENTICATOR.complete())).thenReturn(false).thenReturn(true);
        Mockito.when(SSL_TRANSPORT_LAYER.sniHostName()).thenReturn(Optional.of(SNI_HOST_NAME_WITH_LKC));
        KafkaChannel kafkaChannel = new KafkaChannel(CHANNEL_ID, SSL_TRANSPORT_LAYER, () -> {
            return SASL_SERVER_AUTHENTICATOR;
        }, MAX_RECEIVE_SIZE, new MockAsyncAuthExecutor(), POOL, METADATA_REGISTRY, new DefaultBrokerInterceptor(), MOCK_TIME, true, true, (String) null);
        kafkaChannel.prepare();
        Assertions.assertEquals(KafkaChannel.ChannelLkcState.LKC_READY, kafkaChannel.lkcState());
        Assertions.assertEquals(KafkaChannel.ChannelSniState.SNI_READY, kafkaChannel.sniState());
        Assertions.assertEquals("lkc-2v531", kafkaChannel.lkcId());
        Mockito.when(Boolean.valueOf(SSL_TRANSPORT_LAYER.proxyProtocolReady())).thenReturn(false);
        Mockito.when(SSL_TRANSPORT_LAYER.lkcId()).thenReturn(Optional.empty());
        Mockito.when(SSL_TRANSPORT_LAYER.sniHostName()).thenReturn(Optional.of(SNI_HOST_NAME_WITH_LKC));
        KafkaChannel kafkaChannel2 = new KafkaChannel(CHANNEL_ID, SSL_TRANSPORT_LAYER, () -> {
            return SASL_SERVER_AUTHENTICATOR;
        }, MAX_RECEIVE_SIZE, new MockAsyncAuthExecutor(), POOL, METADATA_REGISTRY, new DefaultBrokerInterceptor(), MOCK_TIME, true, false, (String) null);
        kafkaChannel2.prepare();
        Assertions.assertEquals(KafkaChannel.ChannelLkcState.LKC_READY, kafkaChannel2.lkcState());
        Assertions.assertEquals(KafkaChannel.ChannelSniState.SNI_READY, kafkaChannel2.sniState());
        Assertions.assertEquals("lkc-2v531", kafkaChannel2.lkcId());
    }

    @Test
    public void testInetAddressToTenantMap() throws IOException {
        MultiTenantPrincipal multiTenantPrincipal = new MultiTenantPrincipal("User:*", new TenantMetadata("tenant", "clusterId"));
        Mockito.when(Boolean.valueOf(SSL_TRANSPORT_LAYER.proxyProtocolReady())).thenReturn(true);
        Mockito.when(Boolean.valueOf(SSL_TRANSPORT_LAYER.ready())).thenReturn(true);
        Mockito.when(SSL_TRANSPORT_LAYER.sniHostName()).thenReturn(Optional.of(RAW_SNI_HOST_NAME));
        Mockito.when(Boolean.valueOf(SASL_SERVER_AUTHENTICATOR.complete())).thenReturn(false).thenReturn(true);
        Mockito.when(SASL_SERVER_AUTHENTICATOR.principal()).thenReturn(multiTenantPrincipal);
        KafkaChannel kafkaChannel = new KafkaChannel(CHANNEL_ID, SSL_TRANSPORT_LAYER, () -> {
            return SASL_SERVER_AUTHENTICATOR;
        }, MAX_RECEIVE_SIZE, new MockAsyncAuthExecutor(), POOL, METADATA_REGISTRY, new DefaultBrokerInterceptor(), MOCK_TIME, true, false, (String) null);
        InetAddressToTenantContext inetAddressToTenantContext = new InetAddressToTenantContext();
        inetAddressToTenantContext.setTrackInetAddressToTenantEnabled(true);
        kafkaChannel.setInetAddressToTenantContext(inetAddressToTenantContext);
        Mockito.when(kafkaChannel.socketAddress()).thenReturn(LOOPBACK_SOCKET_ADDRESS);
        kafkaChannel.maybeTrackInetAddressToTenant();
        ((SaslServerAuthenticator) Mockito.verify(SASL_SERVER_AUTHENTICATOR)).principal();
        Assertions.assertEquals(1, kafkaChannel.getInetAddressToTenantContext().getInetAddressToTenant().size());
        Assertions.assertEquals("clusterId", kafkaChannel.getInetAddressToTenantContext().getInetAddressToTenant().get(LOOPBACK_SOCKET_ADDRESS));
    }

    @Test
    public void testInetAddressToAPIKeyMap() throws IOException {
        SaslAuthenticationException saslAuthenticationException = new SaslAuthenticationException("Fail Auth", new AuthenticationErrorInfo(AuditEventStatus.FAILURE, "error Message", "identifier", "clusterId"));
        Mockito.when(Boolean.valueOf(SSL_TRANSPORT_LAYER.proxyProtocolReady())).thenReturn(true);
        Mockito.when(Boolean.valueOf(SSL_TRANSPORT_LAYER.ready())).thenReturn(true);
        Mockito.when(SSL_TRANSPORT_LAYER.sniHostName()).thenReturn(Optional.of(RAW_SNI_HOST_NAME));
        Mockito.when(Boolean.valueOf(SASL_SERVER_AUTHENTICATOR.complete())).thenReturn(false).thenReturn(true);
        ((SaslServerAuthenticator) Mockito.doAnswer(invocationOnMock -> {
            throw saslAuthenticationException;
        }).when(SASL_SERVER_AUTHENTICATOR)).authenticate();
        KafkaChannel kafkaChannel = new KafkaChannel(CHANNEL_ID, SSL_TRANSPORT_LAYER, () -> {
            return SASL_SERVER_AUTHENTICATOR;
        }, MAX_RECEIVE_SIZE, new MockAsyncAuthExecutor(), POOL, METADATA_REGISTRY, new DefaultBrokerInterceptor(), MOCK_TIME, true, false, (String) null);
        InetAddressToTenantContext inetAddressToTenantContext = new InetAddressToTenantContext();
        inetAddressToTenantContext.setTrackInetAddressToAPIKeyEnabled(true);
        kafkaChannel.setInetAddressToTenantContext(inetAddressToTenantContext);
        Mockito.when(kafkaChannel.socketAddress()).thenReturn(LOOPBACK_SOCKET_ADDRESS);
        kafkaChannel.getClass();
        Assertions.assertThrows(AuthenticationException.class, kafkaChannel::prepare);
        ((SaslServerAuthenticator) Mockito.verify(SASL_SERVER_AUTHENTICATOR)).authenticate();
        Assertions.assertEquals(1, kafkaChannel.getInetAddressToTenantContext().getInetAddressToAPIKey().size());
        Assertions.assertEquals("identifier", kafkaChannel.getInetAddressToTenantContext().getInetAddressToAPIKey().get(LOOPBACK_SOCKET_ADDRESS));
    }

    @Test
    public void testSending() throws IOException {
        ByteBufferSend sizePrefixed = ByteBufferSend.sizePrefixed(ByteBuffer.wrap(TestUtils.randomBytes(128)));
        NetworkSend networkSend = new NetworkSend(CHANNEL_ID, sizePrefixed);
        KAFKA_CHANNEL.setSend(networkSend);
        Assertions.assertTrue(KAFKA_CHANNEL.hasSend());
        Assertions.assertThrows(IllegalStateException.class, () -> {
            KAFKA_CHANNEL.setSend(networkSend);
        });
        Mockito.when(Long.valueOf(TRANSPORT.write((ByteBuffer[]) Mockito.any(ByteBuffer[].class)))).thenReturn(4L);
        Assertions.assertEquals(4L, KAFKA_CHANNEL.write());
        Assertions.assertEquals(128L, sizePrefixed.remaining());
        Assertions.assertNull(KAFKA_CHANNEL.maybeCompleteSend());
        Mockito.when(Long.valueOf(TRANSPORT.write((ByteBuffer[]) Mockito.any(ByteBuffer[].class)))).thenReturn(64L);
        Assertions.assertEquals(64L, KAFKA_CHANNEL.write());
        Assertions.assertEquals(64L, sizePrefixed.remaining());
        Assertions.assertNull(KAFKA_CHANNEL.maybeCompleteSend());
        Mockito.when(Long.valueOf(TRANSPORT.write((ByteBuffer[]) Mockito.any(ByteBuffer[].class)))).thenReturn(64L);
        Assertions.assertEquals(64L, KAFKA_CHANNEL.write());
        Assertions.assertEquals(0L, sizePrefixed.remaining());
        Assertions.assertEquals(networkSend, KAFKA_CHANNEL.maybeCompleteSend());
    }

    @Test
    public void testReceiving() throws IOException {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Integer.class);
        Mockito.when(POOL.tryAllocate(((Integer) forClass.capture()).intValue())).thenAnswer(invocationOnMock -> {
            return ByteBuffer.allocate(((Integer) forClass.getValue()).intValue());
        });
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(ByteBuffer.class);
        Mockito.when(Integer.valueOf(TRANSPORT.read((ByteBuffer) forClass2.capture()))).thenAnswer(invocationOnMock2 -> {
            ((ByteBuffer) forClass2.getValue()).putInt(128);
            return 4;
        }).thenReturn(0);
        Assertions.assertEquals(4L, KAFKA_CHANNEL.read());
        Assertions.assertEquals(4, KAFKA_CHANNEL.currentReceive().bytesRead());
        Assertions.assertNull(KAFKA_CHANNEL.maybeCompleteReceive());
        Mockito.reset(new TransportLayer[]{TRANSPORT});
        Mockito.when(Integer.valueOf(TRANSPORT.read((ByteBuffer) forClass2.capture()))).thenAnswer(invocationOnMock3 -> {
            ((ByteBuffer) forClass2.getValue()).put(TestUtils.randomBytes(64));
            return 64;
        });
        Assertions.assertEquals(64L, KAFKA_CHANNEL.read());
        Assertions.assertEquals(68, KAFKA_CHANNEL.currentReceive().bytesRead());
        Assertions.assertNull(KAFKA_CHANNEL.maybeCompleteReceive());
        Mockito.reset(new TransportLayer[]{TRANSPORT});
        Mockito.when(Integer.valueOf(TRANSPORT.read((ByteBuffer) forClass2.capture()))).thenAnswer(invocationOnMock4 -> {
            ((ByteBuffer) forClass2.getValue()).put(TestUtils.randomBytes(64));
            return 64;
        });
        Assertions.assertEquals(64L, KAFKA_CHANNEL.read());
        Assertions.assertEquals(132, KAFKA_CHANNEL.currentReceive().bytesRead());
        Assertions.assertNotNull(KAFKA_CHANNEL.maybeCompleteReceive());
        Assertions.assertNull(KAFKA_CHANNEL.currentReceive());
    }

    @Test
    public void testChannelTotalServiceTimeMetrics() throws IOException {
        TransportLayer transportLayer = (TransportLayer) Mockito.mock(SslTransportLayer.class);
        SaslServerAuthenticator saslServerAuthenticator = (SaslServerAuthenticator) Mockito.mock(SaslServerAuthenticator.class);
        ChannelReadinessMock channelReadinessMock = new ChannelReadinessMock();
        channelReadinessMock.shouldHandshake = true;
        channelReadinessMock.shouldAuthenticate = true;
        channelReadinessMock.shouldHandleProxy = true;
        ((SaslServerAuthenticator) Mockito.doAnswer(invocationOnMock -> {
            if (!channelReadinessMock.shouldAuthenticate || channelReadinessMock.authenticationCompleted) {
                return null;
            }
            MOCK_TIME.sleep(channelReadinessMock.authenticationTime.toMillis());
            channelReadinessMock.authenticationCompleted = true;
            return null;
        }).when(saslServerAuthenticator)).authenticate();
        ((SaslServerAuthenticator) Mockito.doAnswer(invocationOnMock2 -> {
            return Boolean.valueOf(channelReadinessMock.authenticationCompleted);
        }).when(saslServerAuthenticator)).complete();
        ((TransportLayer) Mockito.doAnswer(invocationOnMock3 -> {
            if (!channelReadinessMock.shouldHandleProxy || channelReadinessMock.proxyHandlingCompleted) {
                return null;
            }
            channelReadinessMock.proxyHandlingCompleted = true;
            return null;
        }).when(transportLayer)).handleProxyProtocol();
        ((TransportLayer) Mockito.doAnswer(invocationOnMock4 -> {
            return Boolean.valueOf(channelReadinessMock.proxyHandlingCompleted);
        }).when(transportLayer)).proxyProtocolReady();
        ((TransportLayer) Mockito.doAnswer(invocationOnMock5 -> {
            if (!channelReadinessMock.shouldHandshake || channelReadinessMock.handshakeCompleted) {
                return null;
            }
            MOCK_TIME.sleep(channelReadinessMock.handshakeTime.toMillis());
            channelReadinessMock.handshakeCompleted = true;
            return null;
        }).when(transportLayer)).handshake();
        ((TransportLayer) Mockito.doAnswer(invocationOnMock6 -> {
            return Boolean.valueOf(channelReadinessMock.handshakeCompleted && channelReadinessMock.proxyHandlingCompleted);
        }).when(transportLayer)).ready();
        KafkaChannel kafkaChannel = new KafkaChannel(CHANNEL_ID, transportLayer, () -> {
            return saslServerAuthenticator;
        }, MAX_RECEIVE_SIZE, new MockAsyncAuthExecutor(), POOL, METADATA_REGISTRY, new DefaultBrokerInterceptor(), MOCK_TIME, true, false, (String) null);
        kafkaChannel.setConnectionRegistrationTime(channelReadinessMock.registrationTime.toNanos());
        kafkaChannel.prepare();
        ((SaslServerAuthenticator) Mockito.verify(saslServerAuthenticator, Mockito.times(1))).authenticate();
        ((SaslServerAuthenticator) Mockito.verify(saslServerAuthenticator, Mockito.times(2))).complete();
        ((TransportLayer) Mockito.verify(transportLayer, Mockito.times(1))).handshake();
        ((TransportLayer) Mockito.verify(transportLayer, Mockito.times(3))).ready();
        Assertions.assertEquals(channelReadinessMock.handshakeTime.toNanos(), kafkaChannel.metrics().transportHandshakeTimeNanos());
        Assertions.assertEquals(channelReadinessMock.authenticationTime.toNanos(), kafkaChannel.metrics().authenticationTimeNanos());
        Assertions.assertEquals(channelReadinessMock.registrationTime.toNanos() + channelReadinessMock.handshakeTime.toNanos() + channelReadinessMock.authenticationTime.toNanos(), kafkaChannel.metrics().computeConnectionLocalServiceTimeNanos());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Duration randomTimeDuration() {
        return Duration.ofMillis(1 + new Random(System.nanoTime()).nextInt(10));
    }

    @Test
    public void testSocketAddressWithoutProxyProtocol() throws IOException {
        KafkaChannel createKafkaChannel = createKafkaChannel(new ProxyProtocolEngineFactory(ProxyProtocol.NONE));
        Assertions.assertEquals(KafkaChannel.ChannelProxyState.PROXY_PROCESSED, createKafkaChannel.proxyState());
        createKafkaChannel.prepare();
        Assertions.assertEquals(LOOPBACK_SOCKET_ADDRESS, createKafkaChannel.socketAddress());
        Assertions.assertEquals(KafkaChannel.ChannelProxyState.PROXY_PROCESSED, createKafkaChannel.proxyState());
    }

    @Test
    public void testSocketAddressWithProxyProtocol() throws IOException {
        InetAddress byName = InetAddress.getByName("10.10.10.10");
        testSocketAddress(new ProxyProtocolEngineFactory(() -> {
            return new TestProxyProtocolEngine(byName, 9092);
        }), byName);
    }

    @Test
    public void testUnreadableSocket() throws IOException {
        SelectionKey selectionKey = (SelectionKey) Mockito.mock(SelectionKey.class);
        Mockito.when(Boolean.valueOf(selectionKey.isReadable())).thenReturn(false).thenReturn(true);
        SocketChannel socketChannel = (SocketChannel) Mockito.mock(SocketChannel.class);
        Mockito.when(Integer.valueOf(socketChannel.read((ByteBuffer) Mockito.any(ByteBuffer.class)))).thenReturn(-1);
        KafkaChannel createKafkaChannel = createKafkaChannel(new ProxyProtocolEngineFactory(ProxyProtocol.V1), selectionKey, socketChannel);
        Assertions.assertEquals(KafkaChannel.ChannelProxyState.PROXY_PENDING, createKafkaChannel.proxyState());
        createKafkaChannel.prepare();
        Assertions.assertEquals(KafkaChannel.ChannelProxyState.PROXY_PENDING, createKafkaChannel.proxyState());
        Assertions.assertFalse(createKafkaChannel.hasBytesBuffered());
        createKafkaChannel.getClass();
        Assertions.assertThrows(EOFException.class, createKafkaChannel::prepare);
    }

    @Test
    public void testRequestSequenceId() throws IOException {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Integer.class);
        Mockito.when(POOL.tryAllocate(((Integer) forClass.capture()).intValue())).thenAnswer(invocationOnMock -> {
            return ByteBuffer.allocate(((Integer) forClass.getValue()).intValue());
        });
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(ByteBuffer.class);
        Mockito.when(Integer.valueOf(TRANSPORT.read((ByteBuffer) forClass2.capture()))).thenAnswer(invocationOnMock2 -> {
            ((ByteBuffer) forClass2.getValue()).putInt(4);
            return 4;
        });
        Assertions.assertNull(KAFKA_CHANNEL.currentReceive());
        for (int i = 1; i < 5; i++) {
            KAFKA_CHANNEL.read();
            Assertions.assertEquals(i, KAFKA_CHANNEL.currentReceive().sequenceId());
            KAFKA_CHANNEL.maybeCompleteReceive();
        }
    }

    private void testSocketAddress(ProxyProtocolEngineFactory proxyProtocolEngineFactory, InetAddress inetAddress) throws IOException {
        KafkaChannel createKafkaChannel = createKafkaChannel(proxyProtocolEngineFactory);
        Assertions.assertEquals(KafkaChannel.ChannelProxyState.PROXY_PENDING, createKafkaChannel.proxyState());
        createKafkaChannel.prepare();
        Assertions.assertEquals(KafkaChannel.ChannelProxyState.PROXY_READY, createKafkaChannel.proxyState());
        createKafkaChannel.proxyState(KafkaChannel.ChannelProxyState.PROXY_PROCESSED);
        createKafkaChannel.prepare();
        Assertions.assertEquals(KafkaChannel.ChannelProxyState.PROXY_PROCESSED, createKafkaChannel.proxyState());
        Assertions.assertEquals(inetAddress, createKafkaChannel.socketAddress());
    }

    private KafkaChannel createKafkaChannel(ProxyProtocolEngineFactory proxyProtocolEngineFactory) {
        return createKafkaChannel(proxyProtocolEngineFactory, null, null);
    }

    private KafkaChannel createKafkaChannel(ProxyProtocolEngineFactory proxyProtocolEngineFactory, SelectionKey selectionKey, SocketChannel socketChannel) {
        PlaintextChannelBuilder plaintextChannelBuilder = new PlaintextChannelBuilder(Mode.SERVER, new ListenerName("EXTERNAL"), proxyProtocolEngineFactory);
        plaintextChannelBuilder.configure(Collections.emptyMap());
        if (selectionKey == null) {
            selectionKey = (SelectionKey) Mockito.mock(SelectionKey.class);
            Mockito.when(Boolean.valueOf(selectionKey.isReadable())).thenReturn(true);
        }
        if (socketChannel == null) {
            socketChannel = (SocketChannel) Mockito.mock(SocketChannel.class);
        }
        Socket socket = (Socket) Mockito.mock(Socket.class);
        Mockito.when(selectionKey.channel()).thenReturn(socketChannel);
        Mockito.when(socketChannel.socket()).thenReturn(socket);
        Mockito.when(socket.getInetAddress()).thenReturn(LOOPBACK_SOCKET_ADDRESS);
        return plaintextChannelBuilder.buildChannel(CHANNEL_ID, selectionKey, MAX_RECEIVE_SIZE, new MockAsyncAuthExecutor(), POOL, METADATA_REGISTRY, MOCK_TIME);
    }
}
