package io.confluent.kafkarest.integration.v3;

import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.BinaryNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.serializers.KafkaJsonDeserializer;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;
import io.confluent.kafka.serializers.subject.TopicNameStrategy;
import io.confluent.kafkarest.TestUtils;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.v3.ProduceRequest;
import io.confluent.kafkarest.entities.v3.ProduceResponse;
import io.confluent.kafkarest.exceptions.v3.ErrorResponse;
import io.confluent.kafkarest.testing.DefaultKafkaRestTestEnvironment;
import io.confluent.kafkarest.testing.SchemaRegistryFixture;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
import org.apache.avro.generic.GenericData;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Disabled
@Tag("IntegrationTest")
/* loaded from: input_file:io/confluent/kafkarest/integration/v3/ProduceActionIntegrationTest.class */
public class ProduceActionIntegrationTest {
    private static final String TOPIC_NAME = "topic-1";
    private static final String DEFAULT_KEY_SUBJECT = "topic-1-key";
    private static final String DEFAULT_VALUE_SUBJECT = "topic-1-value";

    @RegisterExtension
    public final DefaultKafkaRestTestEnvironment testEnv = new DefaultKafkaRestTestEnvironment(false);

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        Properties properties = new Properties();
        if (testInfo.getDisplayName().contains("CallerIsRateLimited")) {
            properties.put("rate.limit.enable", "true");
            properties.put("api.v3.produce.rate.limit.enabled", "true");
            properties.put("rate.limit.backend", "resilience4j");
            if (testInfo.getDisplayName().contains("test_whenGlobalByteLimitReached_thenCallerIsRateLimited")) {
                properties.put("api.v3.produce.rate.limit.max.bytes.global.per.sec", "1");
            }
            if (testInfo.getDisplayName().contains("test_whenClusterByteLimitReached_thenCallerIsRateLimited")) {
                properties.put("api.v3.produce.rate.limit.max.bytes.per.sec", "1");
            }
            if (testInfo.getDisplayName().contains("test_whenGlobalRequestCountLimitReached_thenCallerIsRateLimited")) {
                properties.put("api.v3.produce.rate.limit.max.requests.global.per.sec", "1");
            }
            if (testInfo.getDisplayName().contains("test_whenClusterRequestCountLimitReached_thenCallerIsRateLimited")) {
                properties.put("api.v3.produce.rate.limit.max.requests.per.sec", "1");
            }
        }
        this.testEnv.kafkaRest().startApp(properties);
        this.testEnv.kafkaCluster().createTopic(TOPIC_NAME, 3, (short) 1);
    }

    @AfterEach
    public void tearDown() {
        this.testEnv.kafkaRest().closeApp();
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceBinary(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        ByteString copyFromUtf82 = ByteString.copyFromUtf8("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf82.toByteArray())).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assertions.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) record.key()));
        Assertions.assertEquals(copyFromUtf82, ByteString.copyFrom((byte[]) record.value()));
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceBinaryWithNullData(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(NullNode.getInstance()).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(NullNode.getInstance()).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assertions.assertNull(record.key());
        Assertions.assertNull(record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceBinaryWithInvalidData_throwsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(IntNode.valueOf(1)).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(TextNode.valueOf("fooba")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceString(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new StringDeserializer(), new StringDeserializer());
        Assertions.assertEquals("foo", record.key());
        Assertions.assertEquals("bar", record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceStringWithEmptyData(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new StringDeserializer(), new StringDeserializer());
        Assertions.assertTrue(((String) record.key()).isEmpty());
        Assertions.assertTrue(((String) record.value()).isEmpty());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceStringWithNullData(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(NullNode.getInstance()).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(NullNode.getInstance()).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new StringDeserializer(), new StringDeserializer());
        Assertions.assertNull(record.key());
        Assertions.assertNull(record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceWithInvalidData_throwsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"records\": {\"subject\": \"foobar\" } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ErrorResponse errorResponse = (ErrorResponse) post.readEntity(ErrorResponse.class);
        Assertions.assertEquals(400, errorResponse.getErrorCode());
        Assertions.assertEquals("Unrecognized field \"records\" (class io.confluent.kafkarest.entities.v3.AutoValue_ProduceRequest$Builder), not marked as ignorable (6 known properties: \"value\", \"originalSize\", \"partitionId\", \"headers\", \"key\", \"timestamp\"])", errorResponse.getMessage());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceJson(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        KafkaJsonDeserializer kafkaJsonDeserializer = new KafkaJsonDeserializer();
        kafkaJsonDeserializer.configure(Collections.emptyMap(), false);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), kafkaJsonDeserializer, kafkaJsonDeserializer);
        Assertions.assertEquals("foo", record.key());
        Assertions.assertEquals("bar", record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceJsonWithNullData(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(NullNode.getInstance()).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(NullNode.getInstance()).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        KafkaJsonDeserializer kafkaJsonDeserializer = new KafkaJsonDeserializer();
        kafkaJsonDeserializer.configure(Collections.emptyMap(), false);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), kafkaJsonDeserializer, kafkaJsonDeserializer);
        Assertions.assertNull(record.key());
        Assertions.assertNull(record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithRawSchema(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setRawSchema("{\"type\": \"string\"}").setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setRawSchema("{\"type\": \"string\"}").setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assertions.assertEquals("foo", record.key());
        Assertions.assertEquals("bar", record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithRawSchemaAndNullData_throwsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setRawSchema("{\"type\": \"string\"}").setData(NullNode.getInstance()).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setRawSchema("{\"type\": \"string\"}").setData(NullNode.getInstance()).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assertions.assertNull(record.key());
        Assertions.assertNull(record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithRawSchemaAndInvalidData(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setRawSchema("{\"type\": \"string\"}").setData(IntNode.valueOf(1)).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setRawSchema("{\"type\": \"string\"}").setData(IntNode.valueOf(2)).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithSchemaId(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSchemaId(Integer.valueOf(this.testEnv.schemaRegistry().createSchema(DEFAULT_KEY_SUBJECT, new AvroSchema("{\"type\": \"string\"}")).getSchemaId())).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSchemaId(Integer.valueOf(this.testEnv.schemaRegistry().createSchema(DEFAULT_VALUE_SUBJECT, new AvroSchema("{\"type\": \"string\"}")).getSchemaId())).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assertions.assertEquals("foo", record.key());
        Assertions.assertEquals("bar", record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithSchemaVersion(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSchemaVersion(Integer.valueOf(this.testEnv.schemaRegistry().createSchema(DEFAULT_KEY_SUBJECT, new AvroSchema("{\"type\": \"string\"}")).getSchemaVersion())).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSchemaVersion(Integer.valueOf(this.testEnv.schemaRegistry().createSchema(DEFAULT_VALUE_SUBJECT, new AvroSchema("{\"type\": \"string\"}")).getSchemaVersion())).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assertions.assertEquals("foo", record.key());
        Assertions.assertEquals("bar", record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithLatestSchema(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        this.testEnv.schemaRegistry().createSchema(DEFAULT_KEY_SUBJECT, new AvroSchema("{\"type\": \"string\"}"));
        this.testEnv.schemaRegistry().createSchema(DEFAULT_VALUE_SUBJECT, new AvroSchema("{\"type\": \"string\"}"));
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assertions.assertEquals("foo", record.key());
        Assertions.assertEquals("bar", record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithRawSchemaAndSubject(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setSubject("my-key-subject").setRawSchema("{\"type\": \"string\"}").setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setSubject("my-value-subject").setRawSchema("{\"type\": \"string\"}").setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assertions.assertEquals("foo", record.key());
        Assertions.assertEquals("bar", record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithSchemaIdAndSubject(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubject("my-key-subject").setSchemaId(Integer.valueOf(this.testEnv.schemaRegistry().createSchema("my-key-subject", new AvroSchema("{\"type\": \"string\"}")).getSchemaId())).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubject("my-value-subject").setSchemaId(Integer.valueOf(this.testEnv.schemaRegistry().createSchema("my-value-subject", new AvroSchema("{\"type\": \"string\"}")).getSchemaId())).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assertions.assertEquals("foo", record.key());
        Assertions.assertEquals("bar", record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithSchemaVersionAndSubject(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubject("my-key-subject").setSchemaVersion(Integer.valueOf(this.testEnv.schemaRegistry().createSchema("my-key-subject", new AvroSchema("{\"type\": \"string\"}")).getSchemaVersion())).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubject("my-value-subject").setSchemaVersion(Integer.valueOf(this.testEnv.schemaRegistry().createSchema("my-value-subject", new AvroSchema("{\"type\": \"string\"}")).getSchemaVersion())).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assertions.assertEquals("foo", record.key());
        Assertions.assertEquals("bar", record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithLatestSchemaAndSubject(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        this.testEnv.schemaRegistry().createSchema("my-key-subject", new AvroSchema("{\"type\": \"string\"}"));
        this.testEnv.schemaRegistry().createSchema("my-value-subject", new AvroSchema("{\"type\": \"string\"}"));
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubject("my-key-subject").setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubject("my-value-subject").setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assertions.assertEquals("foo", record.key());
        Assertions.assertEquals("bar", record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithRawSchemaAndSubjectStrategy(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setRawSchema("{\"type\": \"record\", \"name\": \"MyKey\", \"fields\": [{\"name\": \"foo\", \"type\": \"string\"}]}").setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setRawSchema("{\"type\": \"record\", \"name\": \"MyValue\", \"fields\": [{\"name\": \"bar\", \"type\": \"string\"}]}").setData(objectNode2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        GenericData.Record record2 = new GenericData.Record(new AvroSchema("{\"type\": \"record\", \"name\": \"MyKey\", \"fields\": [{\"name\": \"foo\", \"type\": \"string\"}]}").rawSchema());
        record2.put("foo", "foz");
        GenericData.Record record3 = new GenericData.Record(new AvroSchema("{\"type\": \"record\", \"name\": \"MyValue\", \"fields\": [{\"name\": \"bar\", \"type\": \"string\"}]}").rawSchema());
        record3.put("bar", "baz");
        Assertions.assertEquals(record2, record.key());
        Assertions.assertEquals(record3, record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithSchemaIdAndSubjectStrategy(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema avroSchema = new AvroSchema("{\"type\": \"record\", \"name\": \"MyKey\", \"fields\": [{\"name\": \"foo\", \"type\": \"string\"}]}");
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema(new RecordNameStrategy().subjectName(TOPIC_NAME, true, avroSchema), avroSchema);
        ParsedSchema avroSchema2 = new AvroSchema("{\"type\": \"record\", \"name\": \"MyValue\", \"fields\": [{\"name\": \"bar\", \"type\": \"string\"}]}");
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema(new RecordNameStrategy().subjectName(TOPIC_NAME, false, avroSchema2), avroSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setSchemaId(Integer.valueOf(createSchema.getSchemaId())).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setSchemaId(Integer.valueOf(createSchema2.getSchemaId())).setData(objectNode2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        GenericData.Record record2 = new GenericData.Record(avroSchema.rawSchema());
        record2.put("foo", "foz");
        GenericData.Record record3 = new GenericData.Record(avroSchema2.rawSchema());
        record3.put("bar", "baz");
        Assertions.assertEquals(record2, record.key());
        Assertions.assertEquals(record3, record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithSchemaVersionAndSubjectStrategy(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema avroSchema = new AvroSchema("{\"type\": \"record\", \"name\": \"MyKey\", \"fields\": [{\"name\": \"foo\", \"type\": \"string\"}]}");
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema(new TopicNameStrategy().subjectName(TOPIC_NAME, true, avroSchema), avroSchema);
        ParsedSchema avroSchema2 = new AvroSchema("{\"type\": \"record\", \"name\": \"MyValue\", \"fields\": [{\"name\": \"bar\", \"type\": \"string\"}]}");
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema(new TopicNameStrategy().subjectName(TOPIC_NAME, false, avroSchema2), avroSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.TOPIC_NAME).setSchemaVersion(Integer.valueOf(createSchema.getSchemaVersion())).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.TOPIC_NAME).setSchemaVersion(Integer.valueOf(createSchema2.getSchemaVersion())).setData(objectNode2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        GenericData.Record record2 = new GenericData.Record(avroSchema.rawSchema());
        record2.put("foo", "foz");
        GenericData.Record record3 = new GenericData.Record(avroSchema2.rawSchema());
        record3.put("bar", "baz");
        Assertions.assertEquals(record2, record.key());
        Assertions.assertEquals(record3, record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithLatestSchemaAndSubjectStrategy(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema avroSchema = new AvroSchema("{\"type\": \"record\", \"name\": \"MyKey\", \"fields\": [{\"name\": \"foo\", \"type\": \"string\"}]}");
        this.testEnv.schemaRegistry().createSchema(new TopicNameStrategy().subjectName(TOPIC_NAME, true, avroSchema), avroSchema);
        ParsedSchema avroSchema2 = new AvroSchema("{\"type\": \"record\", \"name\": \"MyValue\", \"fields\": [{\"name\": \"bar\", \"type\": \"string\"}]}");
        this.testEnv.schemaRegistry().createSchema(new TopicNameStrategy().subjectName(TOPIC_NAME, false, avroSchema2), avroSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.TOPIC_NAME).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.TOPIC_NAME).setData(objectNode2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        GenericData.Record record2 = new GenericData.Record(avroSchema.rawSchema());
        record2.put("foo", "foz");
        GenericData.Record record3 = new GenericData.Record(avroSchema2.rawSchema());
        record3.put("bar", "baz");
        Assertions.assertEquals(record2, record.key());
        Assertions.assertEquals(record3, record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceJsonschemaWithRawSchema(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        TextNode valueOf = TextNode.valueOf("foo");
        TextNode valueOf2 = TextNode.valueOf("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setRawSchema("{\"type\": \"string\"}").setData(valueOf).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setRawSchema("{\"type\": \"string\"}").setData(valueOf2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assertions.assertEquals(valueOf, record.key());
        Assertions.assertEquals(valueOf2, record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceJsonschemaWithRawSchemaAndNullData(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setRawSchema("{\"type\": \"string\"}").setData(NullNode.getInstance()).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setRawSchema("{\"type\": \"string\"}").setData(NullNode.getInstance()).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assertions.assertNull(record.key());
        Assertions.assertNull(record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceJsonschemaWithRawSchemaAndInvalidData_throwsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setRawSchema("{\"type\": \"string\"}").setData(IntNode.valueOf(1)).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setRawSchema("{\"type\": \"string\"}").setData(IntNode.valueOf(2)).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceJsonschemaWithSchemaId(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema(DEFAULT_KEY_SUBJECT, new JsonSchema("{\"type\": \"string\"}"));
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema(DEFAULT_VALUE_SUBJECT, new JsonSchema("{\"type\": \"string\"}"));
        TextNode valueOf = TextNode.valueOf("foo");
        TextNode valueOf2 = TextNode.valueOf("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSchemaId(Integer.valueOf(createSchema.getSchemaId())).setData(valueOf).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSchemaId(Integer.valueOf(createSchema2.getSchemaId())).setData(valueOf2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assertions.assertEquals(valueOf, record.key());
        Assertions.assertEquals(valueOf2, record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceJsonschemaWithSchemaVersion(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema(DEFAULT_KEY_SUBJECT, new JsonSchema("{\"type\": \"string\"}"));
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema(DEFAULT_VALUE_SUBJECT, new JsonSchema("{\"type\": \"string\"}"));
        TextNode valueOf = TextNode.valueOf("foo");
        TextNode valueOf2 = TextNode.valueOf("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSchemaVersion(Integer.valueOf(createSchema.getSchemaVersion())).setData(valueOf).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSchemaVersion(Integer.valueOf(createSchema2.getSchemaVersion())).setData(valueOf2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assertions.assertEquals(valueOf, record.key());
        Assertions.assertEquals(valueOf2, record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceJsonschemaWithLatestSchema(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        this.testEnv.schemaRegistry().createSchema(DEFAULT_KEY_SUBJECT, new JsonSchema("{\"type\": \"string\"}"));
        this.testEnv.schemaRegistry().createSchema(DEFAULT_VALUE_SUBJECT, new JsonSchema("{\"type\": \"string\"}"));
        TextNode valueOf = TextNode.valueOf("foo");
        TextNode valueOf2 = TextNode.valueOf("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setData(valueOf).build()).setValue(ProduceRequest.ProduceRequestData.builder().setData(valueOf2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assertions.assertEquals(valueOf, record.key());
        Assertions.assertEquals(valueOf2, record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceJsonschemaWithRawSchemaAndSubject(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        TextNode valueOf = TextNode.valueOf("foo");
        TextNode valueOf2 = TextNode.valueOf("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setSubject("my-key-subject").setRawSchema("{\"type\": \"string\"}").setData(valueOf).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setSubject("my-value-subject").setRawSchema("{\"type\": \"string\"}").setData(valueOf2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assertions.assertEquals(valueOf, record.key());
        Assertions.assertEquals(valueOf2, record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceJsonschemaWithSchemaIdAndSubject(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema("my-key-subject", new JsonSchema("{\"type\": \"string\"}"));
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema("my-value-subject", new JsonSchema("{\"type\": \"string\"}"));
        TextNode valueOf = TextNode.valueOf("foo");
        TextNode valueOf2 = TextNode.valueOf("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubject("my-key-subject").setSchemaId(Integer.valueOf(createSchema.getSchemaId())).setData(valueOf).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubject("my-value-subject").setSchemaId(Integer.valueOf(createSchema2.getSchemaId())).setData(valueOf2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assertions.assertEquals(valueOf, record.key());
        Assertions.assertEquals(valueOf2, record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceJsonschemaWithSchemaVersionAndSubject(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema("my-key-subject", new JsonSchema("{\"type\": \"string\"}"));
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema("my-value-subject", new JsonSchema("{\"type\": \"string\"}"));
        TextNode valueOf = TextNode.valueOf("foo");
        TextNode valueOf2 = TextNode.valueOf("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubject("my-key-subject").setSchemaVersion(Integer.valueOf(createSchema.getSchemaVersion())).setData(valueOf).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubject("my-value-subject").setSchemaVersion(Integer.valueOf(createSchema2.getSchemaVersion())).setData(valueOf2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assertions.assertEquals(valueOf, record.key());
        Assertions.assertEquals(valueOf2, record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceJsonschemaWithLatestSchemaAndSubject(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        this.testEnv.schemaRegistry().createSchema("my-key-subject", new JsonSchema("{\"type\": \"string\"}"));
        this.testEnv.schemaRegistry().createSchema("my-value-subject", new JsonSchema("{\"type\": \"string\"}"));
        TextNode valueOf = TextNode.valueOf("foo");
        TextNode valueOf2 = TextNode.valueOf("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubject("my-key-subject").setData(valueOf).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubject("my-value-subject").setData(valueOf2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assertions.assertEquals(valueOf, record.key());
        Assertions.assertEquals(valueOf2, record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceJsonschemaWithRawSchemaAndSubjectStrategy(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setRawSchema("{\"type\": \"object\", \"title\": \"MyKey\", \"properties\": {\"foo\": {\"type\": \"string\"}}}").setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setRawSchema("{\"type\": \"object\", \"title\": \"MyValue\", \"properties\": {\"bar\": {\"type\": \"string\"}}}").setData(objectNode2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assertions.assertEquals(objectNode, record.key());
        Assertions.assertEquals(objectNode2, record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceJsonschemaWithSchemaIdAndSubjectStrategy(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema jsonSchema = new JsonSchema("{\"type\": \"object\", \"title\": \"MyKey\", \"properties\": {\"foo\": {\"type\": \"string\"}}}");
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema(new RecordNameStrategy().subjectName(TOPIC_NAME, true, jsonSchema), jsonSchema);
        ParsedSchema jsonSchema2 = new JsonSchema("{\"type\": \"object\", \"title\": \"MyValue\", \"properties\": {\"bar\": {\"type\": \"string\"}}}");
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema(new RecordNameStrategy().subjectName(TOPIC_NAME, false, jsonSchema2), jsonSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setSchemaId(Integer.valueOf(createSchema.getSchemaId())).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setSchemaId(Integer.valueOf(createSchema2.getSchemaId())).setData(objectNode2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assertions.assertEquals(objectNode, record.key());
        Assertions.assertEquals(objectNode2, record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceJsonschemaWithSchemaVersionAndSubjectStrategy(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema jsonSchema = new JsonSchema("{\"type\": \"object\", \"title\": \"MyKey\", \"properties\": {\"foo\": {\"type\": \"string\"}}}");
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema(new TopicNameStrategy().subjectName(TOPIC_NAME, true, jsonSchema), jsonSchema);
        ParsedSchema jsonSchema2 = new JsonSchema("{\"type\": \"object\", \"title\": \"MyValue\", \"properties\": {\"bar\": {\"type\": \"string\"}}}");
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema(new TopicNameStrategy().subjectName(TOPIC_NAME, false, jsonSchema2), jsonSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.TOPIC_NAME).setSchemaVersion(Integer.valueOf(createSchema.getSchemaVersion())).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.TOPIC_NAME).setSchemaVersion(Integer.valueOf(createSchema2.getSchemaVersion())).setData(objectNode2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assertions.assertEquals(objectNode, record.key());
        Assertions.assertEquals(objectNode2, record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceJsonschemaWithLatestSchemaAndSubjectStrategy(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema jsonSchema = new JsonSchema("{\"type\": \"object\", \"title\": \"MyKey\", \"properties\": {\"foo\": {\"type\": \"string\"}}}");
        this.testEnv.schemaRegistry().createSchema(new TopicNameStrategy().subjectName(TOPIC_NAME, true, jsonSchema), jsonSchema);
        ParsedSchema jsonSchema2 = new JsonSchema("{\"type\": \"object\", \"title\": \"MyValue\", \"properties\": {\"bar\": {\"type\": \"string\"}}}");
        this.testEnv.schemaRegistry().createSchema(new TopicNameStrategy().subjectName(TOPIC_NAME, false, jsonSchema2), jsonSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.TOPIC_NAME).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.TOPIC_NAME).setData(objectNode2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assertions.assertEquals(objectNode, record.key());
        Assertions.assertEquals(objectNode2, record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceProtobufWithRawSchema(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ProtobufSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        ProtobufSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setRawSchema(protobufSchema.canonicalString()).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setRawSchema(protobufSchema2.canonicalString()).setData(objectNode2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assertions.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assertions.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceProtobufWithRawSchemaAndNullData(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setRawSchema(new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }").canonicalString()).setData(NullNode.getInstance()).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setRawSchema(new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }").canonicalString()).setData(NullNode.getInstance()).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        Assertions.assertNull(record.key());
        Assertions.assertNull(record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceProtobufWithRawSchemaAndInvalidData_throwsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setRawSchema(new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }").canonicalString()).setData(IntNode.valueOf(1)).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setRawSchema(new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }").canonicalString()).setData(IntNode.valueOf(2)).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceProtobufWithSchemaId(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema(DEFAULT_KEY_SUBJECT, protobufSchema);
        ParsedSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema(DEFAULT_VALUE_SUBJECT, protobufSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSchemaId(Integer.valueOf(createSchema.getSchemaId())).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSchemaId(Integer.valueOf(createSchema2.getSchemaId())).setData(objectNode2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assertions.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assertions.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceProtobufWithSchemaVersion(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema(DEFAULT_KEY_SUBJECT, protobufSchema);
        ParsedSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema(DEFAULT_VALUE_SUBJECT, protobufSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSchemaVersion(Integer.valueOf(createSchema.getSchemaVersion())).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSchemaVersion(Integer.valueOf(createSchema2.getSchemaVersion())).setData(objectNode2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assertions.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assertions.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceProtobufWithLatestSchema(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        this.testEnv.schemaRegistry().createSchema(DEFAULT_KEY_SUBJECT, protobufSchema);
        ParsedSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        this.testEnv.schemaRegistry().createSchema(DEFAULT_VALUE_SUBJECT, protobufSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setData(objectNode2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assertions.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assertions.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceProtobufWithRawSchemaAndSubject(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ProtobufSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        ProtobufSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setSubject("my-key-subject").setRawSchema(protobufSchema.canonicalString()).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setSubject("my-value-subject").setRawSchema(protobufSchema2.canonicalString()).setData(objectNode2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assertions.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assertions.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceProtobufWithSchemaIdAndSubject(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema("my-key-schema", protobufSchema);
        ParsedSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema("my-value-schema", protobufSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubject("my-key-schema").setSchemaId(Integer.valueOf(createSchema.getSchemaId())).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubject("my-value-schema").setSchemaId(Integer.valueOf(createSchema2.getSchemaId())).setData(objectNode2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assertions.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assertions.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceProtobufWithSchemaVersionAndSubject(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema("my-key-schema", protobufSchema);
        ParsedSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema("my-value-schema", protobufSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubject("my-key-schema").setSchemaVersion(Integer.valueOf(createSchema.getSchemaVersion())).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubject("my-value-schema").setSchemaVersion(Integer.valueOf(createSchema2.getSchemaVersion())).setData(objectNode2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assertions.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assertions.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceProtobufWithLatestSchemaAndSubject(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        this.testEnv.schemaRegistry().createSchema("my-key-subject", protobufSchema);
        ParsedSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        this.testEnv.schemaRegistry().createSchema("my-value-subject", protobufSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubject("my-key-subject").setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubject("my-value-subject").setData(objectNode2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assertions.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assertions.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceProtobufWithRawSchemaAndSubjectStrategy(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ProtobufSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        ProtobufSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setRawSchema(protobufSchema.canonicalString()).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setRawSchema(protobufSchema2.canonicalString()).setData(objectNode2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assertions.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assertions.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceProtobufWithSchemaIdAndSubjectStrategy(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema(new RecordNameStrategy().subjectName(TOPIC_NAME, true, protobufSchema), protobufSchema);
        ParsedSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema(new RecordNameStrategy().subjectName(TOPIC_NAME, false, protobufSchema2), protobufSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setSchemaId(Integer.valueOf(createSchema.getSchemaId())).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setSchemaId(Integer.valueOf(createSchema2.getSchemaId())).setData(objectNode2).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assertions.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assertions.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceBinaryWithPartitionId(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        ByteString copyFromUtf82 = ByteString.copyFromUtf8("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setPartitionId(1).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf82.toByteArray())).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, 1, readProduceResponse(post).getOffset().longValue(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assertions.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) record.key()));
        Assertions.assertEquals(copyFromUtf82, ByteString.copyFrom((byte[]) record.value()));
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceBinaryWithTimestamp(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        Instant ofEpochMilli = Instant.ofEpochMilli(1000L);
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        ByteString copyFromUtf82 = ByteString.copyFromUtf8("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf82.toByteArray())).build()).setTimestamp(ofEpochMilli).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assertions.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) record.key()));
        Assertions.assertEquals(copyFromUtf82, ByteString.copyFrom((byte[]) record.value()));
        Assertions.assertEquals(ofEpochMilli, Instant.ofEpochMilli(record.timestamp()));
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceBinaryWithHeaders(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        ByteString copyFromUtf82 = ByteString.copyFromUtf8("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setHeaders(Arrays.asList(ProduceRequest.ProduceRequestHeader.create("header-1", ByteString.copyFromUtf8("value-1")), ProduceRequest.ProduceRequestHeader.create("header-1", ByteString.copyFromUtf8("value-2")), ProduceRequest.ProduceRequestHeader.create("header-2", ByteString.copyFromUtf8("value-3")))).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf82.toByteArray())).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assertions.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) record.key()));
        Assertions.assertEquals(copyFromUtf82, ByteString.copyFrom((byte[]) record.value()));
        Assertions.assertEquals(Arrays.asList(new RecordHeader("header-1", ByteString.copyFromUtf8("value-1").toByteArray()), new RecordHeader("header-1", ByteString.copyFromUtf8("value-2").toByteArray())), ImmutableList.copyOf(record.headers().headers("header-1")));
        Assertions.assertEquals(Collections.singletonList(new RecordHeader("header-2", ByteString.copyFromUtf8("value-3").toByteArray())), ImmutableList.copyOf(record.headers().headers("header-2")));
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceBinaryAndAvro(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setRawSchema("{\"type\": \"string\"}").setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new ByteArrayDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assertions.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) record.key()));
        Assertions.assertEquals("bar", record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceBinaryKeyOnly(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assertions.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) record.key()));
        Assertions.assertNull(record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceBinaryValueOnly(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assertions.assertNull(record.key());
        Assertions.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) record.value()));
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceStringWithPartitionId(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setPartitionId(1).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, 1, readProduceResponse(post).getOffset().longValue(), new StringDeserializer(), new StringDeserializer());
        Assertions.assertEquals("foo", record.key());
        Assertions.assertEquals("bar", record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceStringWithTimestamp(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        Instant ofEpochMilli = Instant.ofEpochMilli(1000L);
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bar")).build()).setTimestamp(ofEpochMilli).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new StringDeserializer(), new StringDeserializer());
        Assertions.assertEquals("foo", record.key());
        Assertions.assertEquals("bar", record.value());
        Assertions.assertEquals(ofEpochMilli, Instant.ofEpochMilli(record.timestamp()));
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceStringWithHeaders(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setHeaders(Arrays.asList(ProduceRequest.ProduceRequestHeader.create("header-1", ByteString.copyFromUtf8("value-1")), ProduceRequest.ProduceRequestHeader.create("header-1", ByteString.copyFromUtf8("value-2")), ProduceRequest.ProduceRequestHeader.create("header-2", ByteString.copyFromUtf8("value-3")))).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new StringDeserializer(), new StringDeserializer());
        Assertions.assertEquals("foo", record.key());
        Assertions.assertEquals("bar", record.value());
        Assertions.assertEquals(Arrays.asList(new RecordHeader("header-1", ByteString.copyFromUtf8("value-1").toByteArray()), new RecordHeader("header-1", ByteString.copyFromUtf8("value-2").toByteArray())), ImmutableList.copyOf(record.headers().headers("header-1")));
        Assertions.assertEquals(Collections.singletonList(new RecordHeader("header-2", ByteString.copyFromUtf8("value-3").toByteArray())), ImmutableList.copyOf(record.headers().headers("header-2")));
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceStringAndAvro(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setRawSchema("{\"type\": \"string\"}").setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new StringDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assertions.assertEquals("foo", record.key());
        Assertions.assertEquals("bar", record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceStringKeyOnly(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("foo")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new StringDeserializer(), new ByteArrayDeserializer());
        Assertions.assertEquals("foo", record.key());
        Assertions.assertNull(record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceStringValueOnly(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new StringDeserializer(), new StringDeserializer());
        Assertions.assertNull(record.key());
        Assertions.assertEquals("bar", record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceNothing(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assertions.assertNull(record.key());
        Assertions.assertNull(record.value());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceJsonBatch(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(TextNode.valueOf("key-" + i)).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(TextNode.valueOf("value-" + i)).build()).setOriginalSize(0L).build());
        }
        StringBuilder sb = new StringBuilder();
        ObjectMapper objectMapper = this.testEnv.kafkaRest().getObjectMapper();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            sb.append(objectMapper.writeValueAsString((ProduceRequest) it.next()));
        }
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(sb.toString(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ImmutableList<ProduceResponse> readProduceResponses = readProduceResponses(post);
        KafkaJsonDeserializer kafkaJsonDeserializer = new KafkaJsonDeserializer();
        kafkaJsonDeserializer.configure(Collections.emptyMap(), false);
        for (int i2 = 0; i2 < 100; i2++) {
            ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, ((ProduceResponse) readProduceResponses.get(i2)).getPartitionId().intValue(), ((ProduceResponse) readProduceResponses.get(i2)).getOffset().longValue(), kafkaJsonDeserializer, kafkaJsonDeserializer);
            Assertions.assertEquals(((ProduceRequest) arrayList.get(i2)).getKey().map((v0) -> {
                return v0.getData();
            }).map((v0) -> {
                return v0.asText();
            }).orElse(null), record.key());
            Assertions.assertEquals(((ProduceRequest) arrayList.get(i2)).getValue().map((v0) -> {
                return v0.getData();
            }).map((v0) -> {
                return v0.asText();
            }).orElse(null), record.value());
        }
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceStringBatch(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("key-" + i)).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.STRING).setData(TextNode.valueOf("value-" + i)).build()).setOriginalSize(0L).build());
        }
        StringBuilder sb = new StringBuilder();
        ObjectMapper objectMapper = this.testEnv.kafkaRest().getObjectMapper();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            sb.append(objectMapper.writeValueAsString((ProduceRequest) it.next()));
        }
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(sb.toString(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ImmutableList<ProduceResponse> readProduceResponses = readProduceResponses(post);
        StringDeserializer stringDeserializer = new StringDeserializer();
        stringDeserializer.configure(Collections.emptyMap(), false);
        for (int i2 = 0; i2 < 100; i2++) {
            ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, ((ProduceResponse) readProduceResponses.get(i2)).getPartitionId().intValue(), ((ProduceResponse) readProduceResponses.get(i2)).getOffset().longValue(), stringDeserializer, stringDeserializer);
            Assertions.assertEquals(((ProduceRequest) arrayList.get(i2)).getKey().map((v0) -> {
                return v0.getData();
            }).map((v0) -> {
                return v0.textValue();
            }).orElse(null), record.key());
            Assertions.assertEquals(((ProduceRequest) arrayList.get(i2)).getValue().map((v0) -> {
                return v0.getData();
            }).map((v0) -> {
                return v0.textValue();
            }).orElse(null), record.value());
        }
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceBinaryBatchWithInvalidData_throwsMultipleBadRequests(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(IntNode.valueOf(2 * i)).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(IntNode.valueOf((2 * i) + 1)).build()).setOriginalSize(0L).build());
        }
        StringBuilder sb = new StringBuilder();
        ObjectMapper objectMapper = this.testEnv.kafkaRest().getObjectMapper();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            sb.append(objectMapper.writeValueAsString((ProduceRequest) it.next()));
        }
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(sb.toString(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ImmutableList<ErrorResponse> readErrorResponses = readErrorResponses(post);
        for (int i2 = 0; i2 < 100; i2++) {
            Assertions.assertEquals(400, ((ErrorResponse) readErrorResponses.get(i2)).getErrorCode());
        }
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceBinaryWithSchemaSubject_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"BINARY\", \"subject\": \"foobar\" } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceBinaryWithSchemaSubjectStrategy_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"BINARY\", \"subject_name_strategy\": \"TOPIC\" } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceBinaryWithRawSchema_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"BINARY\", \"schema\": \"{ \\\"type\\\": \\\"string\\\" }\" } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceBinaryWithSchemaId_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"BINARY\", \"schema_id\": 1 } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceBinaryWithSchemaVersion_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"BINARY\", \"schema_version\": 1 } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceStringWithSchemaSubject_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"STRING\", \"subject\": \"foobar\" } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceStringWithSchemaSubjectStrategy_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"STRING\", \"subject_name_strategy\": \"TOPIC\" } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceStringWithRawSchema_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"STRING\", \"schema\": \"{ \\\"type\\\": \\\"string\\\" }\" } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceStringWithSchemaId_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"STRING\", \"schema_id\": 1 } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceStringWithSchemaVersion_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"STRING\", \"schema_version\": 1 } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithTypeAndSchemaVersion_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"AVRO\", \"schema_version\": 1 } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithTypeAndSchemaId_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"AVRO\", \"schema_id\": 1 } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithTypeAndLatestSchema_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"AVRO\" } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithSchemaSubjectAndSchemaSubjectStrategy_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"subject\": \"foobar\", \"subject_name_strategy\": \"TOPIC\" } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithSchemaIdAndSchemaVersion_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"schema_id\": 1, \"schema_version\": 1 } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithRawSchemaAndSchemaId_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"schema\": \"{ \\\"type\\\": \\\"string\\\" }\", \"schema_id\": 1 } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithRawSchemaAndSchemaVersion_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"schema\": \"{ \\\"type\\\": \\\"string\\\" }\", \"schema_version\": 1 } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithRecordSchemaSubjectStrategyAndSchemaVersion_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"subject_name_strategy\": \"RECORD_NAME\", \"schema_version\": 1 } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceAvroWithRecordSchemaSubjectStrategyAndLatestVersion_returnsBadRequest(String str) throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"subject_name_strategy\": \"RECORD_NAME\" } }", "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(400, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void produceBinaryWithLargerSizeMessage(String str) throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        byte[] generateBinaryData = generateBinaryData(20971420);
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(generateBinaryData)).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        Assertions.assertTrue(readProduceResponse.getValue().isPresent());
        Assertions.assertEquals(20971420, ((ProduceResponse.ProduceResponseData) readProduceResponse.getValue().get()).getSize());
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId().intValue(), readProduceResponse.getOffset().longValue(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assertions.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) record.key()));
        Assertions.assertEquals(20971420, record.serializedValueSize());
        Assertions.assertEquals(Arrays.toString(generateBinaryData), Arrays.toString((byte[]) record.value()));
    }

    private void doByteLimitReachedTest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build(), "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ImmutableList<ErrorResponse> readErrorResponses = readErrorResponses(post);
        Assertions.assertEquals(readErrorResponses.size(), 1);
        Assertions.assertEquals(((ErrorResponse) readErrorResponses.get(0)).getErrorCode(), 429);
    }

    @DisplayName("test_whenGlobalByteLimitReached_thenCallerIsRateLimited")
    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void test_whenGlobalByteLimitReached_thenCallerIsRateLimited(String str) throws Exception {
        doByteLimitReachedTest();
    }

    @DisplayName("test_whenClusterByteLimitReached_thenCallerIsRateLimited")
    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void test_whenClusterByteLimitReached_thenCallerIsRateLimited(String str) throws Exception {
        doByteLimitReachedTest();
    }

    private void doCountLimitTest() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ProduceRequest build = ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(TextNode.valueOf("bar")).build()).setOriginalSize(0L).build();
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(build, "application/json"));
        Response post2 = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(build, "application/json"));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post2.getStatus());
        ImmutableList<ErrorResponse> readErrorResponses = readErrorResponses(post2);
        Assertions.assertEquals(readErrorResponses.size(), 1);
        Assertions.assertEquals(((ErrorResponse) readErrorResponses.get(0)).getErrorCode(), 429);
    }

    @DisplayName("test_whenGlobalRequestCountLimitReached_thenCallerIsRateLimited")
    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void test_whenGlobalRequestCountLimitReached_thenCallerIsRateLimited(String str) throws Exception {
        doCountLimitTest();
    }

    @DisplayName("test_whenClusterRequestCountLimitReached_thenCallerIsRateLimited")
    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void test_whenClusterRequestCountLimitReached_thenCallerIsRateLimited(String str) throws Exception {
        doCountLimitTest();
    }

    private static ProduceResponse readProduceResponse(Response response) {
        response.bufferEntity();
        try {
            return (ProduceResponse) response.readEntity(ProduceResponse.class);
        } catch (ProcessingException e) {
            throw new RuntimeException(((ErrorResponse) response.readEntity(ErrorResponse.class)).toString(), e);
        }
    }

    private static ImmutableList<ProduceResponse> readProduceResponses(Response response) {
        return ImmutableList.copyOf((Iterator) response.readEntity(new GenericType<MappingIterator<ProduceResponse>>() { // from class: io.confluent.kafkarest.integration.v3.ProduceActionIntegrationTest.1
        }));
    }

    private static ImmutableList<ErrorResponse> readErrorResponses(Response response) {
        return ImmutableList.copyOf((Iterator) response.readEntity(new GenericType<MappingIterator<ErrorResponse>>() { // from class: io.confluent.kafkarest.integration.v3.ProduceActionIntegrationTest.2
        }));
    }

    private static byte[] generateBinaryData(int i) {
        byte[] bArr = new byte[i];
        Arrays.fill(bArr, (byte) 1);
        return bArr;
    }
}
