package io.confluent.kafkarest.integration.v3;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafkarest.TestUtils;
import io.confluent.kafkarest.testing.DefaultKafkaRestTestEnvironment;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.client.util.OutputStreamContentProvider;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag("IntegrationTest")
/* loaded from: input_file:io/confluent/kafkarest/integration/v3/ProduceActionRequestSizeLimitIntegrationTest.class */
public class ProduceActionRequestSizeLimitIntegrationTest {
    private static final String TEST_MESSAGE_TEMPLATE = "{ \"value\" : { \"type\" : \"JSON\", \"data\" : \"%s\" }}";
    private static final String TEST_TOPIC_NAME = "test-topic-1";
    private static final int TEST_DATA_SIZE = 1048576;
    private static final int LIMIT_SIZE = 1056768;
    private HttpClient httpClient;

    @RegisterExtension
    public final DefaultKafkaRestTestEnvironment testEnv = new DefaultKafkaRestTestEnvironment(false);
    private static final Logger log = LoggerFactory.getLogger(ProduceActionRequestSizeLimitIntegrationTest.class);
    private static final Random RNG = new Random();
    private static final ObjectMapper TEST_RESPONSE_OBJECT_MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

    /* loaded from: input_file:io/confluent/kafkarest/integration/v3/ProduceActionRequestSizeLimitIntegrationTest$TestProduceResponse.class */
    private static class TestProduceResponse {

        @JsonProperty("offset")
        private long offset = -1;

        @JsonProperty("error_code")
        private int errorCode = -1;

        @JsonProperty("message")
        private String message = "";

        private TestProduceResponse() {
        }

        public void setOffset(long j) {
            this.offset = j;
        }

        public void setErrorCode(int i) {
            this.errorCode = i;
        }

        public void setMessage(String str) {
            this.message = str;
        }
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        Properties properties = new Properties();
        if (!testInfo.getDisplayName().contains("ProduceRequestSizeNoLimit")) {
            properties.put("api.v3.produce.request.size.limit.max.bytes", String.valueOf(LIMIT_SIZE));
        }
        this.testEnv.kafkaRest().startApp(properties);
        this.testEnv.kafkaCluster().createTopic(TEST_TOPIC_NAME, 1, (short) 1);
        SslContextFactory.Client client = new SslContextFactory.Client();
        client.setSslContext(this.testEnv.certificates().getSslContext("kafka-rest"));
        this.httpClient = new HttpClient(client);
        this.httpClient.start();
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.httpClient.stop();
        this.testEnv.kafkaRest().closeApp();
    }

    @DisplayName("testStreaming_ProduceRequestSizeNoLimit")
    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void testStreaming_ProduceRequestSizeNoLimit(String str) throws Exception {
        URI uri = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TEST_TOPIC_NAME + "/records").getUri();
        OutputStreamContentProvider outputStreamContentProvider = new OutputStreamContentProvider();
        InputStreamResponseListener inputStreamResponseListener = new InputStreamResponseListener();
        this.httpClient.POST(uri).header(HttpHeader.TRANSFER_ENCODING, "chunked").content(outputStreamContentProvider, "application/json").send(inputStreamResponseListener);
        this.httpClient.getExecutor().execute(() -> {
            try {
                try {
                    OutputStream outputStream = outputStreamContentProvider.getOutputStream();
                    Throwable th = null;
                    for (int i = 0; i < 5; i++) {
                        outputStream.write(generateData(TEST_DATA_SIZE).getBytes(StandardCharsets.UTF_8));
                    }
                    if (outputStream != null) {
                        if (0 != 0) {
                            try {
                                outputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            outputStream.close();
                        }
                    }
                } finally {
                }
            } catch (IOException e) {
                log.error("Error writing to output stream", e);
            }
        });
        ArrayList arrayList = new ArrayList();
        if (inputStreamResponseListener.get(1L, TimeUnit.MINUTES).getStatus() == 200) {
            InputStream inputStream = inputStreamResponseListener.getInputStream();
            Throwable th = null;
            try {
                try {
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
                    while (true) {
                        String readLine = bufferedReader.readLine();
                        if (readLine == null) {
                            break;
                        } else {
                            arrayList.add(TEST_RESPONSE_OBJECT_MAPPER.readValue(readLine, TestProduceResponse.class));
                        }
                    }
                    if (inputStream != null) {
                        if (0 != 0) {
                            try {
                                inputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            inputStream.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (inputStream != null) {
                    if (th != null) {
                        try {
                            inputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        inputStream.close();
                    }
                }
                throw th3;
            }
        }
        Assertions.assertEquals(5, arrayList.size());
        for (int i = 0; i < arrayList.size(); i++) {
            Assertions.assertEquals(200, ((TestProduceResponse) arrayList.get(i)).errorCode);
            Assertions.assertEquals(i, ((TestProduceResponse) arrayList.get(i)).offset);
        }
    }

    @DisplayName("testStreaming_ProduceRequestSizeWithLimit_withinLimit")
    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void testStreaming_ProduceRequestSizeWithLimit_withinLimit(String str) throws Exception {
        URI uri = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TEST_TOPIC_NAME + "/records").getUri();
        OutputStreamContentProvider outputStreamContentProvider = new OutputStreamContentProvider();
        InputStreamResponseListener inputStreamResponseListener = new InputStreamResponseListener();
        this.httpClient.POST(uri).header(HttpHeader.TRANSFER_ENCODING, "chunked").content(outputStreamContentProvider, "application/json").send(inputStreamResponseListener);
        this.httpClient.getExecutor().execute(() -> {
            try {
                try {
                    OutputStream outputStream = outputStreamContentProvider.getOutputStream();
                    Throwable th = null;
                    for (int i = 0; i < 10; i++) {
                        outputStream.write(generateData(TEST_DATA_SIZE).getBytes(StandardCharsets.UTF_8));
                    }
                    if (outputStream != null) {
                        if (0 != 0) {
                            try {
                                outputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            outputStream.close();
                        }
                    }
                } finally {
                }
            } catch (IOException e) {
                log.error("Error writing to output stream", e);
            }
        });
        ArrayList arrayList = new ArrayList();
        if (inputStreamResponseListener.get(1L, TimeUnit.MINUTES).getStatus() == 200) {
            InputStream inputStream = inputStreamResponseListener.getInputStream();
            Throwable th = null;
            try {
                try {
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
                    while (true) {
                        String readLine = bufferedReader.readLine();
                        if (readLine == null) {
                            break;
                        } else {
                            arrayList.add(TEST_RESPONSE_OBJECT_MAPPER.readValue(readLine, TestProduceResponse.class));
                        }
                    }
                    if (inputStream != null) {
                        if (0 != 0) {
                            try {
                                inputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            inputStream.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (inputStream != null) {
                    if (th != null) {
                        try {
                            inputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        inputStream.close();
                    }
                }
                throw th3;
            }
        }
        Assertions.assertEquals(10, arrayList.size());
        for (int i = 0; i < arrayList.size(); i++) {
            Assertions.assertEquals(200, ((TestProduceResponse) arrayList.get(i)).errorCode);
            Assertions.assertEquals(i, ((TestProduceResponse) arrayList.get(i)).offset);
        }
    }

    @DisplayName("testStreaming_ProduceRequestSizeWithLimit_violateLimitFirstMessage")
    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void testStreaming_ProduceRequestSizeWithLimit_violateLimitFirstMessage(String str) throws Exception {
        URI uri = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TEST_TOPIC_NAME + "/records").getUri();
        OutputStreamContentProvider outputStreamContentProvider = new OutputStreamContentProvider();
        InputStreamResponseListener inputStreamResponseListener = new InputStreamResponseListener();
        this.httpClient.POST(uri).header(HttpHeader.TRANSFER_ENCODING, "chunked").content(outputStreamContentProvider, "application/json").send(inputStreamResponseListener);
        this.httpClient.getExecutor().execute(() -> {
            try {
                OutputStream outputStream = outputStreamContentProvider.getOutputStream();
                Throwable th = null;
                try {
                    outputStream.write(generateData(1064960).getBytes(StandardCharsets.UTF_8));
                    for (int i = 0; i < 10; i++) {
                        outputStream.write(generateData(TEST_DATA_SIZE).getBytes(StandardCharsets.UTF_8));
                    }
                    if (outputStream != null) {
                        if (0 != 0) {
                            try {
                                outputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            outputStream.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (outputStream != null) {
                        if (0 != 0) {
                            try {
                                outputStream.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            outputStream.close();
                        }
                    }
                    throw th3;
                }
            } catch (IOException e) {
                log.error("Error writing to output stream", e);
            }
        });
        ArrayList arrayList = new ArrayList();
        if (inputStreamResponseListener.get(1L, TimeUnit.MINUTES).getStatus() == 200) {
            InputStream inputStream = inputStreamResponseListener.getInputStream();
            Throwable th = null;
            try {
                try {
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
                    while (true) {
                        String readLine = bufferedReader.readLine();
                        if (readLine == null) {
                            break;
                        } else {
                            arrayList.add(TEST_RESPONSE_OBJECT_MAPPER.readValue(readLine, TestProduceResponse.class));
                        }
                    }
                    if (inputStream != null) {
                        if (0 != 0) {
                            try {
                                inputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            inputStream.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (inputStream != null) {
                    if (th != null) {
                        try {
                            inputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        inputStream.close();
                    }
                }
                throw th3;
            }
        }
        Assertions.assertEquals(1, arrayList.size());
        TestProduceResponse testProduceResponse = (TestProduceResponse) arrayList.get(0);
        Assertions.assertEquals(400, testProduceResponse.errorCode);
        MatcherAssert.assertThat(testProduceResponse.message, Matchers.containsString("Produce request size is larger than allowed threshold"));
    }

    @DisplayName("testStreaming_ProduceRequestSizeWithLimit_violateLimitSecondMessage")
    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void testStreaming_ProduceRequestSizeWithLimit_violateLimitSecondMessage(String str) throws Exception {
        URI uri = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TEST_TOPIC_NAME + "/records").getUri();
        OutputStreamContentProvider outputStreamContentProvider = new OutputStreamContentProvider();
        InputStreamResponseListener inputStreamResponseListener = new InputStreamResponseListener();
        this.httpClient.POST(uri).header(HttpHeader.TRANSFER_ENCODING, "chunked").content(outputStreamContentProvider, "application/json").send(inputStreamResponseListener);
        this.httpClient.getExecutor().execute(() -> {
            try {
                OutputStream outputStream = outputStreamContentProvider.getOutputStream();
                Throwable th = null;
                try {
                    outputStream.write(generateData(TEST_DATA_SIZE).getBytes(StandardCharsets.UTF_8));
                    outputStream.write(generateData(1064960).getBytes(StandardCharsets.UTF_8));
                    for (int i = 0; i < 10; i++) {
                        outputStream.write(generateData(TEST_DATA_SIZE).getBytes(StandardCharsets.UTF_8));
                    }
                    if (outputStream != null) {
                        if (0 != 0) {
                            try {
                                outputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            outputStream.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (outputStream != null) {
                        if (0 != 0) {
                            try {
                                outputStream.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            outputStream.close();
                        }
                    }
                    throw th3;
                }
            } catch (IOException e) {
                log.error("Error writing to output stream", e);
            }
        });
        ArrayList arrayList = new ArrayList();
        if (inputStreamResponseListener.get(1L, TimeUnit.MINUTES).getStatus() == 200) {
            InputStream inputStream = inputStreamResponseListener.getInputStream();
            Throwable th = null;
            try {
                try {
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
                    while (true) {
                        String readLine = bufferedReader.readLine();
                        if (readLine == null) {
                            break;
                        } else {
                            arrayList.add(TEST_RESPONSE_OBJECT_MAPPER.readValue(readLine, TestProduceResponse.class));
                        }
                    }
                    if (inputStream != null) {
                        if (0 != 0) {
                            try {
                                inputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            inputStream.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (inputStream != null) {
                    if (th != null) {
                        try {
                            inputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        inputStream.close();
                    }
                }
                throw th3;
            }
        }
        Assertions.assertEquals(2, arrayList.size());
        TestProduceResponse testProduceResponse = (TestProduceResponse) arrayList.get(0);
        Assertions.assertEquals(200, testProduceResponse.errorCode);
        Assertions.assertEquals(0L, testProduceResponse.offset);
        TestProduceResponse testProduceResponse2 = (TestProduceResponse) arrayList.get(1);
        Assertions.assertEquals(400, testProduceResponse2.errorCode);
        MatcherAssert.assertThat(testProduceResponse2.message, Matchers.containsString("Produce request size is larger than allowed threshold"));
    }

    private static String generateData(int i) {
        return String.format(TEST_MESSAGE_TEMPLATE, TestUtils.generateAlphanumericString(RNG, i));
    }
}
