package org.apache.kafka.common.network;

import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.network.ProxyTlvType;
import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/common/network/ProxyProtocolV2EngineTest.class */
public class ProxyProtocolV2EngineTest extends AbstractProxyProtocolEngineTest {
    private String bytesToHexStr(byte[] bArr) {
        StringBuilder sb = new StringBuilder();
        for (byte b : bArr) {
            String hexString = Integer.toHexString(b & 255);
            if (hexString.length() == 1) {
                sb.append(KafkaChannelTest.CHANNEL_ID);
            }
            sb.append(hexString);
        }
        return sb.toString();
    }

    private byte[] hexStrToBytes(String str) {
        byte[] bArr = new byte[str.length() / 2];
        for (int i = 0; i < str.length(); i += 2) {
            bArr[i / 2] = (byte) Integer.parseInt(str.substring(i, i + 2), 16);
        }
        return bArr;
    }

    @Test
    public void testLkcId() throws IOException {
        ProxyProtocolEngine newClientPPE = newClientPPE("127.0.0.1", 8888, DEFAULT_COMMAND, "lkc-4proxy");
        assertValidEngineState(newClientPPE, true, false, newClientPPE.clientAddress(), Integer.valueOf(newClientPPE.clientPort()), newClientPPE.command(), Collections.singletonList(newClientPPE.tlv(ProxyTlvType.LKC_ID)));
        ByteBuffer wrap = ByteBuffer.wrap(newClientPPE.emitHeaders(InetAddress.getByName(newClientPPE.clientAddress() instanceof Inet4Address ? "127.0.0.1" : "2001:db8:85a3:8d3:1319:8a2e:370:7349"), 9092));
        ProxyProtocolEngine newServerPPE = newServerPPE();
        assertValidEngineState(newServerPPE, false, false, null, -1, null);
        newServerPPE.processHeaders(wrap);
        assertValidEngineState(newServerPPE, true, true, InetAddress.getByName("127.0.0.1"), 8888, DEFAULT_COMMAND, Collections.singletonList(DEFAULT_TLV));
    }

    @Test
    public void testCCloudTrafficType() throws IOException {
        ProxyProtocolEngine newClientPPE = newClientPPE("127.0.0.1", 8888, DEFAULT_COMMAND, "lkc-4proxy", DEFAULT_CCLOUD_TRAFFIC_TYPE);
        assertValidEngineState(newClientPPE, true, false, newClientPPE.clientAddress(), Integer.valueOf(newClientPPE.clientPort()), newClientPPE.command(), Arrays.asList(newClientPPE.tlv(ProxyTlvType.LKC_ID), newClientPPE.tlv(ProxyTlvType.CCLOUD_TRAFFIC_TYPE)));
        ByteBuffer wrap = ByteBuffer.wrap(newClientPPE.emitHeaders(InetAddress.getByName(newClientPPE.clientAddress() instanceof Inet4Address ? "127.0.0.1" : "2001:db8:85a3:8d3:1319:8a2e:370:7349"), 9092));
        ProxyProtocolEngine newServerPPE = newServerPPE();
        assertValidEngineState(newServerPPE, false, false, null, -1, null);
        newServerPPE.processHeaders(wrap);
        assertValidEngineState(newServerPPE, true, true, InetAddress.getByName("127.0.0.1"), 8888, DEFAULT_COMMAND, Arrays.asList(DEFAULT_TLV, DEFAULT_CCLOUD_TRAFFIC_TYPE_TLV));
    }

    @Test
    public void testProxyProtocolV2EngineBasic() throws IOException {
        ProxyProtocolEngine newServerPPE = newServerPPE();
        assertValidEngineState(newServerPPE, false, false, null, -1, null);
        ByteBuffer putStream = putStream("0D0A0D0A000D0A515549540A2111000CD83AD0EE0000000022B822B80D0A03");
        newServerPPE.processHeaders(putStream);
        assertValidEngineState(newServerPPE, true, true, InetAddress.getByAddress(hexStrToBytes("D83AD0EE")), 8888, ProxyProtocolCommand.PROXY);
        Assertions.assertEquals(3, putStream.remaining());
    }

    @Test
    public void testProxyProtocolV2EngineIpv6() throws IOException {
        ProxyProtocolEngine newServerPPE = newServerPPE();
        assertValidEngineState(newServerPPE, false, false, null, -1, null);
        ByteBuffer putStream = putStream("0D0A0D0A000D0A515549540A2121002420010DB8AC10FE0100000000000000000000000000000000000000000000000022B822B80D0A");
        newServerPPE.processHeaders(putStream);
        assertValidEngineState(newServerPPE, true, true, InetAddress.getByAddress(hexStrToBytes("20010DB8AC10FE010000000000000000")), 8888, ProxyProtocolCommand.PROXY);
        Assertions.assertEquals(2, putStream.remaining());
    }

    @Test
    public void testProxyProtocolV2EngineWithMultipleSegments() throws IOException {
        ProxyProtocolEngine newServerPPE = newServerPPE();
        ByteBuffer allocate = ByteBuffer.allocate(31);
        putStreamAndFlip(allocate, "0D0A0D0A000D");
        newServerPPE.processHeaders(allocate);
        allocate.compact();
        Assertions.assertFalse(newServerPPE.hasClientInformation());
        Assertions.assertFalse(newServerPPE.ready());
        putStreamAndFlip(allocate, "0A515549540A21");
        newServerPPE.processHeaders(allocate);
        allocate.compact();
        Assertions.assertFalse(newServerPPE.hasClientInformation());
        Assertions.assertFalse(newServerPPE.ready());
        putStreamAndFlip(allocate, "11000CD83AD0EE0000000022B822B80D0A03");
        newServerPPE.processHeaders(allocate);
        allocate.compact();
        Assertions.assertTrue(newServerPPE.hasClientInformation());
        Assertions.assertTrue(newServerPPE.ready());
        Assertions.assertEquals(InetAddress.getByAddress(hexStrToBytes("D83AD0EE")), newServerPPE.clientAddress());
        Assertions.assertEquals(8888, newServerPPE.clientPort());
    }

    @Test
    public void testNonProxyHeaderFallbackEnabled() throws IOException {
        ProxyProtocolEngine newServerPPE = newServerPPE();
        newServerPPE.configure(Collections.singletonMap("confluent.proxy.protocol.fallback.enabled", true));
        ByteBuffer putStream = putStream("AD0EE0000000022B822B0A0D");
        newServerPPE.processHeaders(putStream);
        putStream.compact();
        Assertions.assertFalse(newServerPPE.hasClientInformation());
        Assertions.assertTrue(newServerPPE.ready());
        putStream.flip();
        Assertions.assertEquals(12, putStream.remaining());
    }

    @Test
    public void testProxyProtocolV2LocalConnection() throws IOException {
        ProxyProtocolEngine newServerPPE = newServerPPE();
        assertValidEngineState(newServerPPE, false, false, null, -1, null);
        ByteBuffer putStream = putStream("0D0A0D0A000D0A515549540A20000000CD83AD0EE0000000022B822B");
        newServerPPE.processHeaders(putStream);
        assertValidEngineState(newServerPPE, false, true, null, -1, ProxyProtocolCommand.LOCAL);
        Assertions.assertEquals(hexStrToBytes("CD83AD0EE0000000022B822B").length, putStream.remaining());
    }

    @Test
    public void testMalformedLkcIdForClient() {
        String str = "this-is-not-a-valid-lkc-id";
        Assertions.assertThrows(ConfigException.class, () -> {
            newClientPPE("216.58.208.238", 8888, DEFAULT_COMMAND, str);
        });
    }

    @Test
    public void testLkcIdForClientIsOptional() {
        Assertions.assertNotNull(newClientPPE("216.58.208.238", 8888, DEFAULT_COMMAND, "lkc-4proxy").tlv(ProxyTlvType.LKC_ID));
        Assertions.assertNull(newClientPPE("216.58.208.238", 8888, DEFAULT_COMMAND, null).tlv(ProxyTlvType.LKC_ID));
    }

    @Test
    public void testAddressAndPortOptionalInLocalMode() {
        Assertions.assertNotNull(newClientPPE(null, null, ProxyProtocolCommand.LOCAL, "lkc-4proxy"));
        Assertions.assertThrows(ConfigException.class, () -> {
            newClientPPE(null, null, ProxyProtocolCommand.PROXY, "lkc-4proxy");
        });
    }

    @Test
    public void testMalformedLkcIdForServer() throws IOException {
        String bytesToHexStr = bytesToHexStr(ProxyProtocolV2Engine.tlvBytes(ProxyTlv.createForSubtype(ProxyTlvType.LKC_ID, "this-is-not-a-valid-lkc-id")));
        newServerPPE().processHeaders(putStream("0D0A0D0A000D0A515549540A2111" + ("00" + Integer.toHexString(12 + (bytesToHexStr.length() / 2))) + "D83AD0EE7f00000122B82384" + bytesToHexStr));
    }

    @Test
    public void testMultipleTlvs() throws IOException {
        ProxyTlvType build = new ProxyTlvType.Builder(2).build();
        ProxyTlv proxyTlv = new ProxyTlv(build, hexStrToBytes("636C75737465722E636F6E666C75656E742E696F"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(DEFAULT_TLV);
        arrayList.add(DEFAULT_CCLOUD_TRAFFIC_TYPE_TLV);
        arrayList.add(proxyTlv);
        String bytesToHexStr = bytesToHexStr(ProxyProtocolV2Engine.tlvBytes(arrayList));
        String str = "00" + Integer.toHexString(12 + (bytesToHexStr.length() / 2));
        ProxyProtocolEngine newServerPPE = newServerPPE();
        ByteBuffer wrap = ByteBuffer.wrap(generateHeader("0D0A0D0A000D0A515549540A2111" + str + "D83AD0EE7f00000122B82384" + bytesToHexStr));
        assertValidEngineState(newServerPPE, false, false, null, -1, null);
        newServerPPE.processHeaders(wrap);
        assertValidEngineState(newServerPPE, true, true, InetAddress.getByName("216.58.208.238"), 8888, DEFAULT_COMMAND, Arrays.asList(DEFAULT_TLV, DEFAULT_CCLOUD_TRAFFIC_TYPE_TLV));
        ProxyTlv tlv = newServerPPE.tlv(build);
        Assertions.assertNotNull(tlv);
        Assertions.assertEquals(proxyTlv, tlv);
        Assertions.assertEquals("cluster.confluent.io", new String(tlv.rawValue(), StandardCharsets.UTF_8));
    }

    @Override // org.apache.kafka.common.network.AbstractProxyProtocolEngineTest
    protected ProxyProtocolEngine newServerPPE() {
        return new ProxyProtocolV2Engine(Mode.SERVER, new LogContext());
    }

    @Override // org.apache.kafka.common.network.AbstractProxyProtocolEngineTest
    protected ProxyProtocolEngine newClientPPE() {
        return new ProxyProtocolV2Engine(Mode.CLIENT, new LogContext());
    }

    @Override // org.apache.kafka.common.network.AbstractProxyProtocolEngineTest
    protected byte[] generateHeader(String str) {
        return hexStrToBytes(str);
    }

    @Override // org.apache.kafka.common.network.AbstractProxyProtocolEngineTest
    protected List<String> getInvalidProxyHeaders() {
        return Arrays.asList("0D0A0D0A000D0A515549540A2112000CD83AD0EE0000000022B822B8", "0D0A0D0A000D0A515549540A2122002420010DB8AC10FE0100000000000000000000000000000000000000000000000022B822B80D0A", "0D0D0D0A000D0A515549540A2111000CD83AD0EE0000000022B822B80D0A03", "0D0A0D0A000D0A515549540A1111000CD83AD0EE0000000022B822B80D0A03", "0D0A0D0A000D0A515549540A2211000CD83AD0EE0000000022B822B80D0A03");
    }
}
