package io.confluent.kafkarest.integration.v3;

import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.BinaryNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.serializers.KafkaJsonDeserializer;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;
import io.confluent.kafka.serializers.subject.TopicNameStrategy;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.v3.ProduceRequest;
import io.confluent.kafkarest.entities.v3.ProduceResponse;
import io.confluent.kafkarest.exceptions.v3.ErrorResponse;
import io.confluent.kafkarest.testing.DefaultKafkaRestTestEnvironment;
import io.confluent.kafkarest.testing.SchemaRegistryFixture;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
import org.apache.avro.generic.GenericData;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
@Ignore
/* loaded from: input_file:io/confluent/kafkarest/integration/v3/ProduceActionIntegrationTest.class */
public class ProduceActionIntegrationTest {
    private static final String TOPIC_NAME = "topic-1";
    private static final String DEFAULT_KEY_SUBJECT = "topic-1-key";
    private static final String DEFAULT_VALUE_SUBJECT = "topic-1-value";

    @Rule
    public final DefaultKafkaRestTestEnvironment testEnv = new DefaultKafkaRestTestEnvironment();

    @Before
    public void setUp() throws Exception {
        this.testEnv.kafkaCluster().createTopic(TOPIC_NAME, 3, (short) 1);
    }

    @Test
    public void produceBinary() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        ByteString copyFromUtf82 = ByteString.copyFromUtf8("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf82.toByteArray())).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assert.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) record.key()));
        Assert.assertEquals(copyFromUtf82, ByteString.copyFrom((byte[]) record.value()));
    }

    @Test
    public void produceBinaryWithNullData() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(NullNode.getInstance()).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(NullNode.getInstance()).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assert.assertNull(record.key());
        Assert.assertNull(record.value());
    }

    @Test
    public void produceBinaryWithInvalidData_throwsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(IntNode.valueOf(1)).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(TextNode.valueOf("fooba")).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assert.assertEquals(400L, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @Test
    public void produceWithInvalidData_throwsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"records\": {\"subject\": \"foobar\" } }", "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ErrorResponse errorResponse = (ErrorResponse) post.readEntity(ErrorResponse.class);
        Assert.assertEquals(400L, errorResponse.getErrorCode());
        Assert.assertEquals("Unrecognized field \"records\" (class io.confluent.kafkarest.entities.v3.AutoValue_ProduceRequest$Builder), not marked as ignorable (6 known properties: \"value\", \"originalSize\", \"partitionId\", \"headers\", \"key\", \"timestamp\"])", errorResponse.getMessage());
    }

    @Test
    public void produceJson() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(TextNode.valueOf("bar")).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        KafkaJsonDeserializer kafkaJsonDeserializer = new KafkaJsonDeserializer();
        kafkaJsonDeserializer.configure(Collections.emptyMap(), false);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), kafkaJsonDeserializer, kafkaJsonDeserializer);
        Assert.assertEquals("foo", record.key());
        Assert.assertEquals("bar", record.value());
    }

    @Test
    public void produceJsonWithNullData() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(NullNode.getInstance()).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(NullNode.getInstance()).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        KafkaJsonDeserializer kafkaJsonDeserializer = new KafkaJsonDeserializer();
        kafkaJsonDeserializer.configure(Collections.emptyMap(), false);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), kafkaJsonDeserializer, kafkaJsonDeserializer);
        Assert.assertNull(record.key());
        Assert.assertNull(record.value());
    }

    @Test
    public void produceAvroWithRawSchema() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setRawSchema("{\"type\": \"string\"}").setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setRawSchema("{\"type\": \"string\"}").setData(TextNode.valueOf("bar")).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assert.assertEquals("foo", record.key());
        Assert.assertEquals("bar", record.value());
    }

    @Test
    public void produceAvroWithRawSchemaAndNullData_throwsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setRawSchema("{\"type\": \"string\"}").setData(NullNode.getInstance()).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setRawSchema("{\"type\": \"string\"}").setData(NullNode.getInstance()).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assert.assertNull(record.key());
        Assert.assertNull(record.value());
    }

    @Test
    public void produceAvroWithRawSchemaAndInvalidData() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setRawSchema("{\"type\": \"string\"}").setData(IntNode.valueOf(1)).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setRawSchema("{\"type\": \"string\"}").setData(IntNode.valueOf(2)).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assert.assertEquals(400L, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @Test
    public void produceAvroWithSchemaId() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSchemaId(Integer.valueOf(this.testEnv.schemaRegistry().createSchema(DEFAULT_KEY_SUBJECT, new AvroSchema("{\"type\": \"string\"}")).getSchemaId())).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSchemaId(Integer.valueOf(this.testEnv.schemaRegistry().createSchema(DEFAULT_VALUE_SUBJECT, new AvroSchema("{\"type\": \"string\"}")).getSchemaId())).setData(TextNode.valueOf("bar")).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assert.assertEquals("foo", record.key());
        Assert.assertEquals("bar", record.value());
    }

    @Test
    public void produceAvroWithSchemaVersion() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSchemaVersion(Integer.valueOf(this.testEnv.schemaRegistry().createSchema(DEFAULT_KEY_SUBJECT, new AvroSchema("{\"type\": \"string\"}")).getSchemaVersion())).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSchemaVersion(Integer.valueOf(this.testEnv.schemaRegistry().createSchema(DEFAULT_VALUE_SUBJECT, new AvroSchema("{\"type\": \"string\"}")).getSchemaVersion())).setData(TextNode.valueOf("bar")).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assert.assertEquals("foo", record.key());
        Assert.assertEquals("bar", record.value());
    }

    @Test
    public void produceAvroWithLatestSchema() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        this.testEnv.schemaRegistry().createSchema(DEFAULT_KEY_SUBJECT, new AvroSchema("{\"type\": \"string\"}"));
        this.testEnv.schemaRegistry().createSchema(DEFAULT_VALUE_SUBJECT, new AvroSchema("{\"type\": \"string\"}"));
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setData(TextNode.valueOf("bar")).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assert.assertEquals("foo", record.key());
        Assert.assertEquals("bar", record.value());
    }

    @Test
    public void produceAvroWithRawSchemaAndSubject() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setSubject("my-key-subject").setRawSchema("{\"type\": \"string\"}").setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setSubject("my-value-subject").setRawSchema("{\"type\": \"string\"}").setData(TextNode.valueOf("bar")).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assert.assertEquals("foo", record.key());
        Assert.assertEquals("bar", record.value());
    }

    @Test
    public void produceAvroWithSchemaIdAndSubject() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubject("my-key-subject").setSchemaId(Integer.valueOf(this.testEnv.schemaRegistry().createSchema("my-key-subject", new AvroSchema("{\"type\": \"string\"}")).getSchemaId())).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubject("my-value-subject").setSchemaId(Integer.valueOf(this.testEnv.schemaRegistry().createSchema("my-value-subject", new AvroSchema("{\"type\": \"string\"}")).getSchemaId())).setData(TextNode.valueOf("bar")).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assert.assertEquals("foo", record.key());
        Assert.assertEquals("bar", record.value());
    }

    @Test
    public void produceAvroWithSchemaVersionAndSubject() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubject("my-key-subject").setSchemaVersion(Integer.valueOf(this.testEnv.schemaRegistry().createSchema("my-key-subject", new AvroSchema("{\"type\": \"string\"}")).getSchemaVersion())).setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubject("my-value-subject").setSchemaVersion(Integer.valueOf(this.testEnv.schemaRegistry().createSchema("my-value-subject", new AvroSchema("{\"type\": \"string\"}")).getSchemaVersion())).setData(TextNode.valueOf("bar")).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assert.assertEquals("foo", record.key());
        Assert.assertEquals("bar", record.value());
    }

    @Test
    public void produceAvroWithLatestSchemaAndSubject() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        this.testEnv.schemaRegistry().createSchema("my-key-subject", new AvroSchema("{\"type\": \"string\"}"));
        this.testEnv.schemaRegistry().createSchema("my-value-subject", new AvroSchema("{\"type\": \"string\"}"));
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubject("my-key-subject").setData(TextNode.valueOf("foo")).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubject("my-value-subject").setData(TextNode.valueOf("bar")).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assert.assertEquals("foo", record.key());
        Assert.assertEquals("bar", record.value());
    }

    @Test
    public void produceAvroWithRawSchemaAndSubjectStrategy() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setRawSchema("{\"type\": \"record\", \"name\": \"MyKey\", \"fields\": [{\"name\": \"foo\", \"type\": \"string\"}]}").setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setRawSchema("{\"type\": \"record\", \"name\": \"MyValue\", \"fields\": [{\"name\": \"bar\", \"type\": \"string\"}]}").setData(objectNode2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        GenericData.Record record2 = new GenericData.Record(new AvroSchema("{\"type\": \"record\", \"name\": \"MyKey\", \"fields\": [{\"name\": \"foo\", \"type\": \"string\"}]}").rawSchema());
        record2.put("foo", "foz");
        GenericData.Record record3 = new GenericData.Record(new AvroSchema("{\"type\": \"record\", \"name\": \"MyValue\", \"fields\": [{\"name\": \"bar\", \"type\": \"string\"}]}").rawSchema());
        record3.put("bar", "baz");
        Assert.assertEquals(record2, record.key());
        Assert.assertEquals(record3, record.value());
    }

    @Test
    public void produceAvroWithSchemaIdAndSubjectStrategy() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema avroSchema = new AvroSchema("{\"type\": \"record\", \"name\": \"MyKey\", \"fields\": [{\"name\": \"foo\", \"type\": \"string\"}]}");
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema(new RecordNameStrategy().subjectName(TOPIC_NAME, true, avroSchema), avroSchema);
        ParsedSchema avroSchema2 = new AvroSchema("{\"type\": \"record\", \"name\": \"MyValue\", \"fields\": [{\"name\": \"bar\", \"type\": \"string\"}]}");
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema(new RecordNameStrategy().subjectName(TOPIC_NAME, false, avroSchema2), avroSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setSchemaId(Integer.valueOf(createSchema.getSchemaId())).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setSchemaId(Integer.valueOf(createSchema2.getSchemaId())).setData(objectNode2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        GenericData.Record record2 = new GenericData.Record(avroSchema.rawSchema());
        record2.put("foo", "foz");
        GenericData.Record record3 = new GenericData.Record(avroSchema2.rawSchema());
        record3.put("bar", "baz");
        Assert.assertEquals(record2, record.key());
        Assert.assertEquals(record3, record.value());
    }

    @Test
    public void produceAvroWithSchemaVersionAndSubjectStrategy() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema avroSchema = new AvroSchema("{\"type\": \"record\", \"name\": \"MyKey\", \"fields\": [{\"name\": \"foo\", \"type\": \"string\"}]}");
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema(new TopicNameStrategy().subjectName(TOPIC_NAME, true, avroSchema), avroSchema);
        ParsedSchema avroSchema2 = new AvroSchema("{\"type\": \"record\", \"name\": \"MyValue\", \"fields\": [{\"name\": \"bar\", \"type\": \"string\"}]}");
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema(new TopicNameStrategy().subjectName(TOPIC_NAME, false, avroSchema2), avroSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.TOPIC_NAME).setSchemaVersion(Integer.valueOf(createSchema.getSchemaVersion())).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.TOPIC_NAME).setSchemaVersion(Integer.valueOf(createSchema2.getSchemaVersion())).setData(objectNode2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        GenericData.Record record2 = new GenericData.Record(avroSchema.rawSchema());
        record2.put("foo", "foz");
        GenericData.Record record3 = new GenericData.Record(avroSchema2.rawSchema());
        record3.put("bar", "baz");
        Assert.assertEquals(record2, record.key());
        Assert.assertEquals(record3, record.value());
    }

    @Test
    public void produceAvroWithLatestSchemaAndSubjectStrategy() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema avroSchema = new AvroSchema("{\"type\": \"record\", \"name\": \"MyKey\", \"fields\": [{\"name\": \"foo\", \"type\": \"string\"}]}");
        this.testEnv.schemaRegistry().createSchema(new TopicNameStrategy().subjectName(TOPIC_NAME, true, avroSchema), avroSchema);
        ParsedSchema avroSchema2 = new AvroSchema("{\"type\": \"record\", \"name\": \"MyValue\", \"fields\": [{\"name\": \"bar\", \"type\": \"string\"}]}");
        this.testEnv.schemaRegistry().createSchema(new TopicNameStrategy().subjectName(TOPIC_NAME, false, avroSchema2), avroSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.TOPIC_NAME).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.TOPIC_NAME).setData(objectNode2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createAvroDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        GenericData.Record record2 = new GenericData.Record(avroSchema.rawSchema());
        record2.put("foo", "foz");
        GenericData.Record record3 = new GenericData.Record(avroSchema2.rawSchema());
        record3.put("bar", "baz");
        Assert.assertEquals(record2, record.key());
        Assert.assertEquals(record3, record.value());
    }

    @Test
    public void produceJsonschemaWithRawSchema() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        TextNode valueOf = TextNode.valueOf("foo");
        TextNode valueOf2 = TextNode.valueOf("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setRawSchema("{\"type\": \"string\"}").setData(valueOf).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setRawSchema("{\"type\": \"string\"}").setData(valueOf2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assert.assertEquals(valueOf, record.key());
        Assert.assertEquals(valueOf2, record.value());
    }

    @Test
    public void produceJsonschemaWithRawSchemaAndNullData() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setRawSchema("{\"type\": \"string\"}").setData(NullNode.getInstance()).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setRawSchema("{\"type\": \"string\"}").setData(NullNode.getInstance()).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assert.assertNull(record.key());
        Assert.assertNull(record.value());
    }

    @Test
    public void produceJsonschemaWithRawSchemaAndInvalidData_throwsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setRawSchema("{\"type\": \"string\"}").setData(IntNode.valueOf(1)).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setRawSchema("{\"type\": \"string\"}").setData(IntNode.valueOf(2)).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assert.assertEquals(400L, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @Test
    public void produceJsonschemaWithSchemaId() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema(DEFAULT_KEY_SUBJECT, new JsonSchema("{\"type\": \"string\"}"));
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema(DEFAULT_VALUE_SUBJECT, new JsonSchema("{\"type\": \"string\"}"));
        TextNode valueOf = TextNode.valueOf("foo");
        TextNode valueOf2 = TextNode.valueOf("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSchemaId(Integer.valueOf(createSchema.getSchemaId())).setData(valueOf).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSchemaId(Integer.valueOf(createSchema2.getSchemaId())).setData(valueOf2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assert.assertEquals(valueOf, record.key());
        Assert.assertEquals(valueOf2, record.value());
    }

    @Test
    public void produceJsonschemaWithSchemaVersion() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema(DEFAULT_KEY_SUBJECT, new JsonSchema("{\"type\": \"string\"}"));
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema(DEFAULT_VALUE_SUBJECT, new JsonSchema("{\"type\": \"string\"}"));
        TextNode valueOf = TextNode.valueOf("foo");
        TextNode valueOf2 = TextNode.valueOf("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSchemaVersion(Integer.valueOf(createSchema.getSchemaVersion())).setData(valueOf).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSchemaVersion(Integer.valueOf(createSchema2.getSchemaVersion())).setData(valueOf2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assert.assertEquals(valueOf, record.key());
        Assert.assertEquals(valueOf2, record.value());
    }

    @Test
    public void produceJsonschemaWithLatestSchema() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        this.testEnv.schemaRegistry().createSchema(DEFAULT_KEY_SUBJECT, new JsonSchema("{\"type\": \"string\"}"));
        this.testEnv.schemaRegistry().createSchema(DEFAULT_VALUE_SUBJECT, new JsonSchema("{\"type\": \"string\"}"));
        TextNode valueOf = TextNode.valueOf("foo");
        TextNode valueOf2 = TextNode.valueOf("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setData(valueOf).build()).setValue(ProduceRequest.ProduceRequestData.builder().setData(valueOf2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assert.assertEquals(valueOf, record.key());
        Assert.assertEquals(valueOf2, record.value());
    }

    @Test
    public void produceJsonschemaWithRawSchemaAndSubject() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        TextNode valueOf = TextNode.valueOf("foo");
        TextNode valueOf2 = TextNode.valueOf("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setSubject("my-key-subject").setRawSchema("{\"type\": \"string\"}").setData(valueOf).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setSubject("my-value-subject").setRawSchema("{\"type\": \"string\"}").setData(valueOf2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assert.assertEquals(valueOf, record.key());
        Assert.assertEquals(valueOf2, record.value());
    }

    @Test
    public void produceJsonschemaWithSchemaIdAndSubject() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema("my-key-subject", new JsonSchema("{\"type\": \"string\"}"));
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema("my-value-subject", new JsonSchema("{\"type\": \"string\"}"));
        TextNode valueOf = TextNode.valueOf("foo");
        TextNode valueOf2 = TextNode.valueOf("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubject("my-key-subject").setSchemaId(Integer.valueOf(createSchema.getSchemaId())).setData(valueOf).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubject("my-value-subject").setSchemaId(Integer.valueOf(createSchema2.getSchemaId())).setData(valueOf2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assert.assertEquals(valueOf, record.key());
        Assert.assertEquals(valueOf2, record.value());
    }

    @Test
    public void produceJsonschemaWithSchemaVersionAndSubject() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema("my-key-subject", new JsonSchema("{\"type\": \"string\"}"));
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema("my-value-subject", new JsonSchema("{\"type\": \"string\"}"));
        TextNode valueOf = TextNode.valueOf("foo");
        TextNode valueOf2 = TextNode.valueOf("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubject("my-key-subject").setSchemaVersion(Integer.valueOf(createSchema.getSchemaVersion())).setData(valueOf).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubject("my-value-subject").setSchemaVersion(Integer.valueOf(createSchema2.getSchemaVersion())).setData(valueOf2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assert.assertEquals(valueOf, record.key());
        Assert.assertEquals(valueOf2, record.value());
    }

    @Test
    public void produceJsonschemaWithLatestSchemaAndSubject() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        this.testEnv.schemaRegistry().createSchema("my-key-subject", new JsonSchema("{\"type\": \"string\"}"));
        this.testEnv.schemaRegistry().createSchema("my-value-subject", new JsonSchema("{\"type\": \"string\"}"));
        TextNode valueOf = TextNode.valueOf("foo");
        TextNode valueOf2 = TextNode.valueOf("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubject("my-key-subject").setData(valueOf).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubject("my-value-subject").setData(valueOf2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assert.assertEquals(valueOf, record.key());
        Assert.assertEquals(valueOf2, record.value());
    }

    @Test
    public void produceJsonschemaWithRawSchemaAndSubjectStrategy() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setRawSchema("{\"type\": \"object\", \"title\": \"MyKey\", \"properties\": {\"foo\": {\"type\": \"string\"}}}").setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSONSCHEMA).setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setRawSchema("{\"type\": \"object\", \"title\": \"MyValue\", \"properties\": {\"bar\": {\"type\": \"string\"}}}").setData(objectNode2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assert.assertEquals(objectNode, record.key());
        Assert.assertEquals(objectNode2, record.value());
    }

    @Test
    public void produceJsonschemaWithSchemaIdAndSubjectStrategy() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema jsonSchema = new JsonSchema("{\"type\": \"object\", \"title\": \"MyKey\", \"properties\": {\"foo\": {\"type\": \"string\"}}}");
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema(new RecordNameStrategy().subjectName(TOPIC_NAME, true, jsonSchema), jsonSchema);
        ParsedSchema jsonSchema2 = new JsonSchema("{\"type\": \"object\", \"title\": \"MyValue\", \"properties\": {\"bar\": {\"type\": \"string\"}}}");
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema(new RecordNameStrategy().subjectName(TOPIC_NAME, false, jsonSchema2), jsonSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setSchemaId(Integer.valueOf(createSchema.getSchemaId())).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setSchemaId(Integer.valueOf(createSchema2.getSchemaId())).setData(objectNode2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assert.assertEquals(objectNode, record.key());
        Assert.assertEquals(objectNode2, record.value());
    }

    @Test
    public void produceJsonschemaWithSchemaVersionAndSubjectStrategy() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema jsonSchema = new JsonSchema("{\"type\": \"object\", \"title\": \"MyKey\", \"properties\": {\"foo\": {\"type\": \"string\"}}}");
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema(new TopicNameStrategy().subjectName(TOPIC_NAME, true, jsonSchema), jsonSchema);
        ParsedSchema jsonSchema2 = new JsonSchema("{\"type\": \"object\", \"title\": \"MyValue\", \"properties\": {\"bar\": {\"type\": \"string\"}}}");
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema(new TopicNameStrategy().subjectName(TOPIC_NAME, false, jsonSchema2), jsonSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.TOPIC_NAME).setSchemaVersion(Integer.valueOf(createSchema.getSchemaVersion())).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.TOPIC_NAME).setSchemaVersion(Integer.valueOf(createSchema2.getSchemaVersion())).setData(objectNode2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assert.assertEquals(objectNode, record.key());
        Assert.assertEquals(objectNode2, record.value());
    }

    @Test
    public void produceJsonschemaWithLatestSchemaAndSubjectStrategy() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema jsonSchema = new JsonSchema("{\"type\": \"object\", \"title\": \"MyKey\", \"properties\": {\"foo\": {\"type\": \"string\"}}}");
        this.testEnv.schemaRegistry().createSchema(new TopicNameStrategy().subjectName(TOPIC_NAME, true, jsonSchema), jsonSchema);
        ParsedSchema jsonSchema2 = new JsonSchema("{\"type\": \"object\", \"title\": \"MyValue\", \"properties\": {\"bar\": {\"type\": \"string\"}}}");
        this.testEnv.schemaRegistry().createSchema(new TopicNameStrategy().subjectName(TOPIC_NAME, false, jsonSchema2), jsonSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.TOPIC_NAME).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.TOPIC_NAME).setData(objectNode2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer(), this.testEnv.schemaRegistry().createJsonSchemaDeserializer());
        Assert.assertEquals(objectNode, record.key());
        Assert.assertEquals(objectNode2, record.value());
    }

    @Test
    public void produceProtobufWithRawSchema() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ProtobufSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        ProtobufSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setRawSchema(protobufSchema.canonicalString()).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setRawSchema(protobufSchema2.canonicalString()).setData(objectNode2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assert.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assert.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @Test
    public void produceProtobufWithRawSchemaAndNullData() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setRawSchema(new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }").canonicalString()).setData(NullNode.getInstance()).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setRawSchema(new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }").canonicalString()).setData(NullNode.getInstance()).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        Assert.assertNull(record.key());
        Assert.assertNull(record.value());
    }

    @Test
    public void produceProtobufWithRawSchemaAndInvalidData_throwsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setRawSchema(new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }").canonicalString()).setData(IntNode.valueOf(1)).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setRawSchema(new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }").canonicalString()).setData(IntNode.valueOf(2)).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assert.assertEquals(400L, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @Test
    public void produceProtobufWithSchemaId() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema(DEFAULT_KEY_SUBJECT, protobufSchema);
        ParsedSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema(DEFAULT_VALUE_SUBJECT, protobufSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSchemaId(Integer.valueOf(createSchema.getSchemaId())).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSchemaId(Integer.valueOf(createSchema2.getSchemaId())).setData(objectNode2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assert.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assert.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @Test
    public void produceProtobufWithSchemaVersion() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema(DEFAULT_KEY_SUBJECT, protobufSchema);
        ParsedSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema(DEFAULT_VALUE_SUBJECT, protobufSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSchemaVersion(Integer.valueOf(createSchema.getSchemaVersion())).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSchemaVersion(Integer.valueOf(createSchema2.getSchemaVersion())).setData(objectNode2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assert.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assert.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @Test
    public void produceProtobufWithLatestSchema() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        this.testEnv.schemaRegistry().createSchema(DEFAULT_KEY_SUBJECT, protobufSchema);
        ParsedSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        this.testEnv.schemaRegistry().createSchema(DEFAULT_VALUE_SUBJECT, protobufSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setData(objectNode2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assert.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assert.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @Test
    public void produceProtobufWithRawSchemaAndSubject() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ProtobufSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        ProtobufSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setSubject("my-key-subject").setRawSchema(protobufSchema.canonicalString()).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setSubject("my-value-subject").setRawSchema(protobufSchema2.canonicalString()).setData(objectNode2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assert.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assert.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @Test
    public void produceProtobufWithSchemaIdAndSubject() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema("my-key-schema", protobufSchema);
        ParsedSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema("my-value-schema", protobufSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubject("my-key-schema").setSchemaId(Integer.valueOf(createSchema.getSchemaId())).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubject("my-value-schema").setSchemaId(Integer.valueOf(createSchema2.getSchemaId())).setData(objectNode2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assert.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assert.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @Test
    public void produceProtobufWithSchemaVersionAndSubject() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema("my-key-schema", protobufSchema);
        ParsedSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema("my-value-schema", protobufSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubject("my-key-schema").setSchemaVersion(Integer.valueOf(createSchema.getSchemaVersion())).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubject("my-value-schema").setSchemaVersion(Integer.valueOf(createSchema2.getSchemaVersion())).setData(objectNode2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assert.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assert.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @Test
    public void produceProtobufWithLatestSchemaAndSubject() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        this.testEnv.schemaRegistry().createSchema("my-key-subject", protobufSchema);
        ParsedSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        this.testEnv.schemaRegistry().createSchema("my-value-subject", protobufSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubject("my-key-subject").setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubject("my-value-subject").setData(objectNode2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assert.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assert.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @Test
    public void produceProtobufWithRawSchemaAndSubjectStrategy() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ProtobufSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        ProtobufSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setRawSchema(protobufSchema.canonicalString()).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.PROTOBUF).setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setRawSchema(protobufSchema2.canonicalString()).setData(objectNode2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assert.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assert.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @Test
    public void produceProtobufWithSchemaIdAndSubjectStrategy() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ParsedSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message MyKey { string foo = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema = this.testEnv.schemaRegistry().createSchema(new RecordNameStrategy().subjectName(TOPIC_NAME, true, protobufSchema), protobufSchema);
        ParsedSchema protobufSchema2 = new ProtobufSchema("syntax = \"proto3\"; message MyValue { string bar = 1; }");
        SchemaRegistryFixture.SchemaKey createSchema2 = this.testEnv.schemaRegistry().createSchema(new RecordNameStrategy().subjectName(TOPIC_NAME, false, protobufSchema2), protobufSchema2);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "foz");
        ObjectNode objectNode2 = new ObjectNode(JsonNodeFactory.instance);
        objectNode2.put("bar", "baz");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setSchemaId(Integer.valueOf(createSchema.getSchemaId())).setData(objectNode).build()).setValue(ProduceRequest.ProduceRequestData.builder().setSubjectNameStrategy(ProduceRequest.EnumSubjectNameStrategy.RECORD_NAME).setSchemaId(Integer.valueOf(createSchema2.getSchemaId())).setData(objectNode2).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), this.testEnv.schemaRegistry().createProtobufDeserializer(), this.testEnv.schemaRegistry().createProtobufDeserializer());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), "foz");
        DynamicMessage.Builder newBuilder2 = DynamicMessage.newBuilder(protobufSchema2.toDescriptor());
        newBuilder2.setField(protobufSchema2.toDescriptor().findFieldByName("bar"), "baz");
        Assert.assertEquals(newBuilder.build().toByteString(), ((Message) record.key()).toByteString());
        Assert.assertEquals(newBuilder2.build().toByteString(), ((Message) record.value()).toByteString());
    }

    @Test
    public void produceBinaryWithPartitionId() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        ByteString copyFromUtf82 = ByteString.copyFromUtf8("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setPartitionId(1).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf82.toByteArray())).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, 1, readProduceResponse(post).getOffset(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assert.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) record.key()));
        Assert.assertEquals(copyFromUtf82, ByteString.copyFrom((byte[]) record.value()));
    }

    @Test
    public void produceBinaryWithTimestamp() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        Instant ofEpochMilli = Instant.ofEpochMilli(1000L);
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        ByteString copyFromUtf82 = ByteString.copyFromUtf8("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf82.toByteArray())).build()).setTimestamp(ofEpochMilli).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assert.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) record.key()));
        Assert.assertEquals(copyFromUtf82, ByteString.copyFrom((byte[]) record.value()));
        Assert.assertEquals(ofEpochMilli, Instant.ofEpochMilli(record.timestamp()));
    }

    @Test
    public void produceBinaryWithHeaders() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        ByteString copyFromUtf82 = ByteString.copyFromUtf8("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setHeaders(Arrays.asList(ProduceRequest.ProduceRequestHeader.create("header-1", ByteString.copyFromUtf8("value-1")), ProduceRequest.ProduceRequestHeader.create("header-1", ByteString.copyFromUtf8("value-2")), ProduceRequest.ProduceRequestHeader.create("header-2", ByteString.copyFromUtf8("value-3")))).setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf82.toByteArray())).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assert.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) record.key()));
        Assert.assertEquals(copyFromUtf82, ByteString.copyFrom((byte[]) record.value()));
        Assert.assertEquals(Arrays.asList(new RecordHeader("header-1", ByteString.copyFromUtf8("value-1").toByteArray()), new RecordHeader("header-1", ByteString.copyFromUtf8("value-2").toByteArray())), ImmutableList.copyOf(record.headers().headers("header-1")));
        Assert.assertEquals(Collections.singletonList(new RecordHeader("header-2", ByteString.copyFromUtf8("value-3").toByteArray())), ImmutableList.copyOf(record.headers().headers("header-2")));
    }

    @Test
    public void produceBinaryAndAvro() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.AVRO).setRawSchema("{\"type\": \"string\"}").setData(TextNode.valueOf("bar")).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), new ByteArrayDeserializer(), this.testEnv.schemaRegistry().createAvroDeserializer());
        Assert.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) record.key()));
        Assert.assertEquals("bar", record.value());
    }

    @Test
    public void produceBinaryKeyOnly() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("foo");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assert.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) record.key()));
        Assert.assertNull(record.value());
    }

    @Test
    public void produceBinaryValueOnly() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("bar");
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(BinaryNode.valueOf(copyFromUtf8.toByteArray())).build()).build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assert.assertNull(record.key());
        Assert.assertEquals(copyFromUtf8, ByteString.copyFrom((byte[]) record.value()));
    }

    @Test
    public void produceNothing() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(ProduceRequest.builder().build(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ProduceResponse readProduceResponse = readProduceResponse(post);
        ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, readProduceResponse.getPartitionId(), readProduceResponse.getOffset(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Assert.assertNull(record.key());
        Assert.assertNull(record.value());
    }

    @Test
    public void produceJsonBatch() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 1000; i++) {
            arrayList.add(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(TextNode.valueOf("key-" + i)).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.JSON).setData(TextNode.valueOf("value-" + i)).build()).build());
        }
        StringBuilder sb = new StringBuilder();
        ObjectMapper objectMapper = this.testEnv.kafkaRest().getObjectMapper();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            sb.append(objectMapper.writeValueAsString((ProduceRequest) it.next()));
        }
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(sb.toString(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ImmutableList<ProduceResponse> readProduceResponses = readProduceResponses(post);
        KafkaJsonDeserializer kafkaJsonDeserializer = new KafkaJsonDeserializer();
        kafkaJsonDeserializer.configure(Collections.emptyMap(), false);
        for (int i2 = 0; i2 < 1000; i2++) {
            ConsumerRecord record = this.testEnv.kafkaCluster().getRecord(TOPIC_NAME, ((ProduceResponse) readProduceResponses.get(i2)).getPartitionId(), ((ProduceResponse) readProduceResponses.get(i2)).getOffset(), kafkaJsonDeserializer, kafkaJsonDeserializer);
            Assert.assertEquals(((ProduceRequest) arrayList.get(i2)).getKey().map((v0) -> {
                return v0.getData();
            }).map((v0) -> {
                return v0.asText();
            }).orElse(null), record.key());
            Assert.assertEquals(((ProduceRequest) arrayList.get(i2)).getValue().map((v0) -> {
                return v0.getData();
            }).map((v0) -> {
                return v0.asText();
            }).orElse(null), record.value());
        }
    }

    @Test
    public void produceBinaryBatchWithInvalidData_throwsMultipleBadRequests() throws Exception {
        String clusterId = this.testEnv.kafkaCluster().getClusterId();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 1000; i++) {
            arrayList.add(ProduceRequest.builder().setKey(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(IntNode.valueOf(2 * i)).build()).setValue(ProduceRequest.ProduceRequestData.builder().setFormat(EmbeddedFormat.BINARY).setData(IntNode.valueOf((2 * i) + 1)).build()).build());
        }
        StringBuilder sb = new StringBuilder();
        ObjectMapper objectMapper = this.testEnv.kafkaRest().getObjectMapper();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            sb.append(objectMapper.writeValueAsString((ProduceRequest) it.next()));
        }
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity(sb.toString(), "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        ImmutableList<ErrorResponse> readErrorResponses = readErrorResponses(post);
        for (int i2 = 0; i2 < 1000; i2++) {
            Assert.assertEquals(400L, ((ErrorResponse) readErrorResponses.get(i2)).getErrorCode());
        }
    }

    @Test
    public void produceBinaryWithSchemaSubject_returnsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"BINARY\", \"subject\": \"foobar\" } }", "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assert.assertEquals(400L, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @Test
    public void produceBinaryWithSchemaSubjectStrategy_returnsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"BINARY\", \"subject_name_strategy\": \"TOPIC\" } }", "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assert.assertEquals(400L, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @Test
    public void produceBinaryWithRawSchema_returnsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"BINARY\", \"schema\": \"{ \\\"type\\\": \\\"string\\\" }\" } }", "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assert.assertEquals(400L, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @Test
    public void produceBinaryWithSchemaId_returnsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"BINARY\", \"schema_id\": 1 } }", "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assert.assertEquals(400L, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @Test
    public void produceBinaryWithSchemaVersion_returnsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"BINARY\", \"schema_version\": 1 } }", "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assert.assertEquals(400L, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @Test
    public void produceAvroWithTypeAndSchemaVersion_returnsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"AVRO\", \"schema_version\": 1 } }", "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assert.assertEquals(400L, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @Test
    public void produceAvroWithTypeAndSchemaId_returnsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"AVRO\", \"schema_id\": 1 } }", "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assert.assertEquals(400L, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @Test
    public void produceAvroWithTypeAndLatestSchema_returnsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"type\": \"AVRO\" } }", "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assert.assertEquals(400L, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @Test
    public void produceAvroWithSchemaSubjectAndSchemaSubjectStrategy_returnsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"subject\": \"foobar\", \"subject_name_strategy\": \"TOPIC\" } }", "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assert.assertEquals(400L, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @Test
    public void produceAvroWithSchemaIdAndSchemaVersion_returnsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"schema_id\": 1, \"schema_version\": 1 } }", "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assert.assertEquals(400L, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @Test
    public void produceAvroWithRawSchemaAndSchemaId_returnsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"schema\": \"{ \\\"type\\\": \\\"string\\\" }\", \"schema_id\": 1 } }", "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assert.assertEquals(400L, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @Test
    public void produceAvroWithRawSchemaAndSchemaVersion_returnsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"schema\": \"{ \\\"type\\\": \\\"string\\\" }\", \"schema_version\": 1 } }", "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assert.assertEquals(400L, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @Test
    public void produceAvroWithRecordSchemaSubjectStrategyAndSchemaVersion_returnsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"subject_name_strategy\": \"RECORD_NAME\", \"schema_version\": 1 } }", "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assert.assertEquals(400L, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    @Test
    public void produceAvroWithRecordSchemaSubjectStrategyAndLatestVersion_returnsBadRequest() throws Exception {
        Response post = this.testEnv.kafkaRest().target().path("/v3/clusters/" + this.testEnv.kafkaCluster().getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.entity("{ \"key\": { \"subject_name_strategy\": \"RECORD_NAME\" } }", "application/json"));
        Assert.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        Assert.assertEquals(400L, ((ErrorResponse) post.readEntity(ErrorResponse.class)).getErrorCode());
    }

    private static ProduceResponse readProduceResponse(Response response) {
        response.bufferEntity();
        try {
            return (ProduceResponse) response.readEntity(ProduceResponse.class);
        } catch (ProcessingException e) {
            throw new RuntimeException(((ErrorResponse) response.readEntity(ErrorResponse.class)).toString(), e);
        }
    }

    private static ImmutableList<ProduceResponse> readProduceResponses(Response response) {
        return ImmutableList.copyOf((Iterator) response.readEntity(new GenericType<MappingIterator<ProduceResponse>>() { // from class: io.confluent.kafkarest.integration.v3.ProduceActionIntegrationTest.1
        }));
    }

    private static ImmutableList<ErrorResponse> readErrorResponses(Response response) {
        return ImmutableList.copyOf((Iterator) response.readEntity(new GenericType<MappingIterator<ErrorResponse>>() { // from class: io.confluent.kafkarest.integration.v3.ProduceActionIntegrationTest.2
        }));
    }
}
