package io.confluent.kafkarest.controllers;

import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.BooleanNode;
import com.fasterxml.jackson.databind.node.FloatNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.BaseEncoding;
import com.google.protobuf.ByteString;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.subject.TopicNameStrategy;
import io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.RegisteredSchema;
import io.confluent.kafkarest.exceptions.BadRequestException;
import io.confluent.rest.exceptions.RestConstraintViolationException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:io/confluent/kafkarest/controllers/RecordSerializerFacadeTest.class */
public class RecordSerializerFacadeTest {
    private static final String TOPIC_NAME = "topic-1";
    private static final String SCHEMA_REGISTRY_SCOPE = "sr";
    private static final Map<String, Object> SCHEMA_SERIALIZER_CONFIGS = ImmutableMap.of("schema.registry.url", "mock://sr", "auto.register.schemas", false, "use.latest.version", false);
    private static final SubjectNameStrategy SUBJECT_NAME_STRATEGY = new TopicNameStrategy();
    public static final int SCHEMA_VERSION = 1;
    private MockSchemaRegistryClient schemaRegistryClient;
    private RecordSerializer recordSerializer;

    @Before
    public void setUp() {
        this.schemaRegistryClient = MockSchemaRegistry.getClientForScope(SCHEMA_REGISTRY_SCOPE, Arrays.asList(new AvroSchemaProvider(), new JsonSchemaProvider(), new ProtobufSchemaProvider()));
        this.recordSerializer = new RecordSerializerFacade(new NoSchemaRecordSerializer(SCHEMA_SERIALIZER_CONFIGS), () -> {
            return new SchemaRecordSerializer(this.schemaRegistryClient, SCHEMA_SERIALIZER_CONFIGS, SCHEMA_SERIALIZER_CONFIGS, SCHEMA_SERIALIZER_CONFIGS);
        });
    }

    @Test
    public void serializeBinaryKey_returnsSerialized() {
        Assertions.assertEquals("foobar", ((ByteString) this.recordSerializer.serialize(EmbeddedFormat.BINARY, TOPIC_NAME, Optional.empty(), TextNode.valueOf(BaseEncoding.base64().encode("foobar".getBytes(StandardCharsets.UTF_8))), true).get()).toStringUtf8());
    }

    @Test
    public void serializeBinaryValue_returnsSerialized() {
        Assertions.assertEquals("foobar", ((ByteString) this.recordSerializer.serialize(EmbeddedFormat.BINARY, TOPIC_NAME, Optional.empty(), TextNode.valueOf(BaseEncoding.base64().encode("foobar".getBytes(StandardCharsets.UTF_8))), false).get()).toStringUtf8());
    }

    @Test
    public void serializeNullBinaryKey_returnsEmpty() {
        Assertions.assertFalse(this.recordSerializer.serialize(EmbeddedFormat.BINARY, TOPIC_NAME, Optional.empty(), NullNode.getInstance(), true).isPresent());
    }

    @Test
    public void serializeNullBinaryValue_returnsEmpty() {
        Assertions.assertFalse(this.recordSerializer.serialize(EmbeddedFormat.BINARY, TOPIC_NAME, Optional.empty(), NullNode.getInstance(), false).isPresent());
    }

    @Test(expected = BadRequestException.class)
    public void serializeInvalidBinaryKey_throwsBadRequestException() {
        this.recordSerializer.serialize(EmbeddedFormat.BINARY, TOPIC_NAME, Optional.empty(), TextNode.valueOf("fooba"), true);
    }

    @Test(expected = BadRequestException.class)
    public void serializeInvalidBinaryValue_throwsBadRequestException() {
        this.recordSerializer.serialize(EmbeddedFormat.BINARY, TOPIC_NAME, Optional.empty(), TextNode.valueOf("fooba"), false);
    }

    @Test
    public void serializeStringJsonKey_returnsSerialized() {
        Assertions.assertEquals("\"foobar\"", ((ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSON, TOPIC_NAME, Optional.empty(), TextNode.valueOf("foobar"), true).get()).toStringUtf8());
    }

    @Test
    public void serializeIntJsonKey_returnsSerialized() {
        Assertions.assertEquals("123", ((ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSON, TOPIC_NAME, Optional.empty(), IntNode.valueOf(123), true).get()).toStringUtf8());
    }

    @Test
    public void serializeFloatJsonKey_returnsSerialized() {
        Assertions.assertEquals("123.456", ((ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSON, TOPIC_NAME, Optional.empty(), FloatNode.valueOf(123.456f), true).get()).toStringUtf8());
    }

    @Test
    public void serializeBooleanJsonKey_returnsSerialized() {
        Assertions.assertEquals("true", ((ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSON, TOPIC_NAME, Optional.empty(), BooleanNode.valueOf(true), true).get()).toStringUtf8());
    }

    @Test
    public void serializeObjectJsonKey_returnsSerialized() {
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", 1);
        objectNode.put("bar", false);
        Assertions.assertEquals("{\"foo\":1,\"bar\":false}", ((ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSON, TOPIC_NAME, Optional.empty(), objectNode, true).get()).toStringUtf8());
    }

    @Test
    public void serializeArrayJsonKey_returnsSerialized() {
        ArrayNode arrayNode = new ArrayNode(JsonNodeFactory.instance);
        arrayNode.add(123.456f);
        arrayNode.add(NullNode.getInstance());
        Assertions.assertEquals("[123.456,null]", ((ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSON, TOPIC_NAME, Optional.empty(), arrayNode, true).get()).toStringUtf8());
    }

    @Test
    public void serializeNullJsonKey_returnsEmpty() {
        Assertions.assertFalse(this.recordSerializer.serialize(EmbeddedFormat.JSON, TOPIC_NAME, Optional.empty(), NullNode.getInstance(), true).isPresent());
    }

    @Test
    public void serializeStringJsonValue_returnsSerialized() {
        Assertions.assertEquals("\"foobar\"", ((ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSON, TOPIC_NAME, Optional.empty(), TextNode.valueOf("foobar"), false).get()).toStringUtf8());
    }

    @Test
    public void serializeIntJsonValue_returnsSerialized() {
        Assertions.assertEquals("123", ((ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSON, TOPIC_NAME, Optional.empty(), IntNode.valueOf(123), false).get()).toStringUtf8());
    }

    @Test
    public void serializeFloatJsonValue_returnsSerialized() {
        Assertions.assertEquals("123.456", ((ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSON, TOPIC_NAME, Optional.empty(), FloatNode.valueOf(123.456f), false).get()).toStringUtf8());
    }

    @Test
    public void serializeBooleanJsonValue_returnsSerialized() {
        Assertions.assertEquals("true", ((ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSON, TOPIC_NAME, Optional.empty(), BooleanNode.valueOf(true), false).get()).toStringUtf8());
    }

    @Test
    public void serializeObjectJsonValue_returnsSerialized() {
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", 1);
        objectNode.put("bar", false);
        Assertions.assertEquals("{\"foo\":1,\"bar\":false}", ((ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSON, TOPIC_NAME, Optional.empty(), objectNode, false).get()).toStringUtf8());
    }

    @Test
    public void serializeArrayJsonValue_returnsSerialized() {
        ArrayNode arrayNode = new ArrayNode(JsonNodeFactory.instance);
        arrayNode.add(123.456f);
        arrayNode.add(NullNode.getInstance());
        Assertions.assertEquals("[123.456,null]", ((ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSON, TOPIC_NAME, Optional.empty(), arrayNode, false).get()).toStringUtf8());
    }

    @Test
    public void serializeNullJsonValue_returnsEmpty() {
        Assertions.assertFalse(this.recordSerializer.serialize(EmbeddedFormat.JSON, TOPIC_NAME, Optional.empty(), NullNode.getInstance(), false).isPresent());
    }

    @Test
    public void serializeStringAvroKey_returnsSerialized() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{\"type\": \"string\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, avroSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, avroSchema), 1, avroSchema)), TextNode.valueOf("foobar"), true).get();
        KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, true);
        Assertions.assertEquals("foobar", kafkaAvroDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray(), avroSchema.rawSchema()));
    }

    @Test
    public void serializeIntAvroKey_returnsSerialized() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{\"type\": \"int\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, avroSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, avroSchema), 1, avroSchema)), IntNode.valueOf(123), true).get();
        KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, true);
        Assertions.assertEquals(123, kafkaAvroDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray(), avroSchema.rawSchema()));
    }

    @Test
    public void serializeFloatAvroKey_returnsSerialized() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{\"type\": \"float\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, avroSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, avroSchema), 1, avroSchema)), FloatNode.valueOf(123.456f), true).get();
        KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, true);
        Assertions.assertEquals(Float.valueOf(123.456f), kafkaAvroDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray(), avroSchema.rawSchema()));
    }

    @Test
    public void serializeBooleanAvroKey_returnsSerialized() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{\"type\": \"boolean\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, avroSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, avroSchema), 1, avroSchema)), BooleanNode.valueOf(true), true).get();
        KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, true);
        Assertions.assertEquals(true, kafkaAvroDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray(), avroSchema.rawSchema()));
    }

    @Test
    public void serializeBytesAvroKey_returnsSerialized() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{\"type\": \"bytes\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, avroSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, avroSchema), 1, avroSchema)), TextNode.valueOf("foobar"), true).get();
        KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, true);
        Assertions.assertEquals(ByteString.copyFrom("foobar", StandardCharsets.ISO_8859_1), ByteString.copyFrom((byte[]) kafkaAvroDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray(), avroSchema.rawSchema())));
    }

    @Test
    public void serializeRecordAvroKey_returnsSerialized() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{    \"name\": \"foobar\",    \"type\": \"record\",    \"fields\": [        {\"name\": \"foo\", \"type\": \"int\"},        {\"name\": \"bar\", \"type\": \"boolean\"}    ]}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, avroSchema);
        int register = this.schemaRegistryClient.register(subjectName, avroSchema);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", 1);
        objectNode.put("bar", false);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, register, 1, avroSchema)), objectNode, true).get();
        KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, true);
        Object deserialize = kafkaAvroDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray(), avroSchema.rawSchema());
        GenericData.Record record = new GenericData.Record(avroSchema.rawSchema());
        record.put("foo", 1);
        record.put("bar", false);
        Assertions.assertEquals(record, deserialize);
    }

    @Test
    public void serializeEnumAvroKey_returnsSerialized() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{    \"name\": \"foobar\",    \"type\": \"enum\",    \"symbols\": [\"foo\", \"bar\"]}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, avroSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, avroSchema), 1, avroSchema)), TextNode.valueOf("foo"), true).get();
        KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, true);
        Assertions.assertEquals(new GenericData.EnumSymbol(avroSchema.rawSchema(), "foo"), kafkaAvroDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray(), avroSchema.rawSchema()));
    }

    @Test
    public void serializeArrayAvroKey_returnsSerialized() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{\"type\": \"array\", \"items\": \"int\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, avroSchema);
        int register = this.schemaRegistryClient.register(subjectName, avroSchema);
        ArrayNode arrayNode = new ArrayNode(JsonNodeFactory.instance);
        arrayNode.add(1);
        arrayNode.add(2);
        arrayNode.add(3);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, register, 1, avroSchema)), arrayNode, true).get();
        KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, true);
        Assertions.assertEquals(new GenericData.Array(avroSchema.rawSchema(), Arrays.asList(1, 2, 3)), kafkaAvroDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray(), avroSchema.rawSchema()));
    }

    @Test
    public void serializeMapAvroKey_returnsSerialized() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{\"type\": \"map\", \"values\": \"int\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, avroSchema);
        int register = this.schemaRegistryClient.register(subjectName, avroSchema);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", 1);
        objectNode.put("bar", 2);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, register, 1, avroSchema)), objectNode, true).get();
        KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, true);
        Object deserialize = kafkaAvroDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray(), avroSchema.rawSchema());
        HashMap hashMap = new HashMap();
        hashMap.put(new Utf8("foo"), 1);
        hashMap.put(new Utf8("bar"), 2);
        Assertions.assertEquals(hashMap, deserialize);
    }

    @Test
    public void serializeNullAvroKeyNullSchema_returnsEmpty() {
        Assertions.assertFalse(this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.empty(), NullNode.getInstance(), true).isPresent());
    }

    @Test
    public void serializeNullAvroKeyNonNullableSchema_returnsEmpty() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{\"type\": \"int\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, avroSchema);
        Assertions.assertFalse(this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, avroSchema), 1, avroSchema)), NullNode.getInstance(), true).isPresent());
    }

    @Test(expected = RestConstraintViolationException.class)
    public void serializeNonNullAvroKeyNullSchema_throwsSerializationException() {
        this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.empty(), TextNode.valueOf("foobar"), true);
    }

    @Test(expected = BadRequestException.class)
    public void serializeInvalidAvroKey_throwsBadRequestException() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{\"type\": \"int\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, avroSchema);
        this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, avroSchema), 1, avroSchema)), TextNode.valueOf("foobar"), true);
    }

    @Test
    public void serializeStringAvroValue_returnsSerialized() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{\"type\": \"string\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, avroSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, avroSchema), 1, avroSchema)), TextNode.valueOf("foobar"), false).get();
        KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, false);
        Assertions.assertEquals("foobar", kafkaAvroDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray(), avroSchema.rawSchema()));
    }

    @Test
    public void serializeIntAvroValue_returnsSerialized() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{\"type\": \"int\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, avroSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, avroSchema), 1, avroSchema)), IntNode.valueOf(123), false).get();
        KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, false);
        Assertions.assertEquals(123, kafkaAvroDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray(), avroSchema.rawSchema()));
    }

    @Test
    public void serializeFloatAvroValue_returnsSerialized() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{\"type\": \"float\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, avroSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, avroSchema), 1, avroSchema)), FloatNode.valueOf(123.456f), false).get();
        KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, false);
        Assertions.assertEquals(Float.valueOf(123.456f), kafkaAvroDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray(), avroSchema.rawSchema()));
    }

    @Test
    public void serializeBooleanAvroValue_returnsSerialized() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{\"type\": \"boolean\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, avroSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, avroSchema), 1, avroSchema)), BooleanNode.valueOf(true), false).get();
        KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, false);
        Assertions.assertEquals(true, kafkaAvroDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray(), avroSchema.rawSchema()));
    }

    @Test
    public void serializeBytesAvroValue_returnsSerialized() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{\"type\": \"bytes\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, avroSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, avroSchema), 1, avroSchema)), TextNode.valueOf("foobar"), false).get();
        KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, false);
        Assertions.assertEquals(ByteString.copyFrom("foobar", StandardCharsets.ISO_8859_1), ByteString.copyFrom((byte[]) kafkaAvroDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray(), avroSchema.rawSchema())));
    }

    @Test
    public void serializeRecordAvroValue_returnsSerialized() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{    \"name\": \"foobar\",    \"type\": \"record\",    \"fields\": [        {\"name\": \"foo\", \"type\": \"int\"},        {\"name\": \"bar\", \"type\": \"boolean\"}    ]}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, avroSchema);
        int register = this.schemaRegistryClient.register(subjectName, avroSchema);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", 1);
        objectNode.put("bar", false);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, register, 1, avroSchema)), objectNode, false).get();
        KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, false);
        Object deserialize = kafkaAvroDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray(), avroSchema.rawSchema());
        GenericData.Record record = new GenericData.Record(avroSchema.rawSchema());
        record.put("foo", 1);
        record.put("bar", false);
        Assertions.assertEquals(record, deserialize);
    }

    @Test
    public void serializeEnumAvroValue_returnsSerialized() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{    \"name\": \"foobar\",    \"type\": \"enum\",    \"symbols\": [\"foo\", \"bar\"]}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, avroSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, avroSchema), 1, avroSchema)), TextNode.valueOf("foo"), false).get();
        KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, false);
        Assertions.assertEquals(new GenericData.EnumSymbol(avroSchema.rawSchema(), "foo"), kafkaAvroDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray(), avroSchema.rawSchema()));
    }

    @Test
    public void serializeArrayAvroValue_returnsSerialized() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{\"type\": \"array\", \"items\": \"int\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, avroSchema);
        int register = this.schemaRegistryClient.register(subjectName, avroSchema);
        ArrayNode arrayNode = new ArrayNode(JsonNodeFactory.instance);
        arrayNode.add(1);
        arrayNode.add(2);
        arrayNode.add(3);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, register, 1, avroSchema)), arrayNode, false).get();
        KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, false);
        Assertions.assertEquals(new GenericData.Array(avroSchema.rawSchema(), Arrays.asList(1, 2, 3)), kafkaAvroDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray(), avroSchema.rawSchema()));
    }

    @Test
    public void serializeMapAvroValue_returnsSerialized() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{\"type\": \"map\", \"values\": \"int\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, avroSchema);
        int register = this.schemaRegistryClient.register(subjectName, avroSchema);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", 1);
        objectNode.put("bar", 2);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, register, 1, avroSchema)), objectNode, false).get();
        KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
        kafkaAvroDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, false);
        Object deserialize = kafkaAvroDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray(), avroSchema.rawSchema());
        HashMap hashMap = new HashMap();
        hashMap.put(new Utf8("foo"), 1);
        hashMap.put(new Utf8("bar"), 2);
        Assertions.assertEquals(hashMap, deserialize);
    }

    @Test
    public void serializeNullAvroValueNullSchema_returnsEmpty() {
        Assertions.assertFalse(this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.empty(), NullNode.getInstance(), false).isPresent());
    }

    @Test
    public void serializeNullAvroValueNonNullableSchema_returnsEmpty() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{\"type\": \"int\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, avroSchema);
        Assertions.assertFalse(this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, avroSchema), 1, avroSchema)), NullNode.getInstance(), false).isPresent());
    }

    @Test(expected = RestConstraintViolationException.class)
    public void serializeNonNullAvroValueNullSchema_throwsSerializationException() {
        this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.empty(), TextNode.valueOf("foobar"), false);
    }

    @Test(expected = BadRequestException.class)
    public void serializeInvalidAvroValue_throwsBadRequestException() throws Exception {
        AvroSchema avroSchema = new AvroSchema("{\"type\": \"int\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, avroSchema);
        this.recordSerializer.serialize(EmbeddedFormat.AVRO, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, avroSchema), 1, avroSchema)), TextNode.valueOf("foobar"), false);
    }

    @Test
    public void serializeStringJsonschemaKey_returnsSerialized() throws Exception {
        JsonSchema jsonSchema = new JsonSchema("{\"type\": \"string\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, jsonSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, jsonSchema), 1, jsonSchema)), TextNode.valueOf("foobar"), true).get();
        KafkaJsonSchemaDeserializer kafkaJsonSchemaDeserializer = new KafkaJsonSchemaDeserializer();
        kafkaJsonSchemaDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, true);
        Assertions.assertEquals("foobar", kafkaJsonSchemaDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray()));
    }

    @Test
    public void serializeIntJsonschemaKey_returnsSerialized() throws Exception {
        JsonSchema jsonSchema = new JsonSchema("{\"type\": \"integer\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, jsonSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, jsonSchema), 1, jsonSchema)), IntNode.valueOf(123), true).get();
        KafkaJsonSchemaDeserializer kafkaJsonSchemaDeserializer = new KafkaJsonSchemaDeserializer();
        kafkaJsonSchemaDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, true);
        Assertions.assertEquals(123, kafkaJsonSchemaDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray()));
    }

    @Test
    public void serializeFloatJsonschemaKey_returnsSerialized() throws Exception {
        JsonSchema jsonSchema = new JsonSchema("{\"type\": \"number\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, jsonSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, jsonSchema), 1, jsonSchema)), FloatNode.valueOf(123.456f), true).get();
        KafkaJsonSchemaDeserializer kafkaJsonSchemaDeserializer = new KafkaJsonSchemaDeserializer();
        kafkaJsonSchemaDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, true);
        Assertions.assertEquals(new BigDecimal("123.456"), kafkaJsonSchemaDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray()));
    }

    @Test
    public void serializeBooleanJsonschemaKey_returnsSerialized() throws Exception {
        JsonSchema jsonSchema = new JsonSchema("{\"type\": \"boolean\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, jsonSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, jsonSchema), 1, jsonSchema)), BooleanNode.valueOf(true), true).get();
        KafkaJsonSchemaDeserializer kafkaJsonSchemaDeserializer = new KafkaJsonSchemaDeserializer();
        kafkaJsonSchemaDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, true);
        Assertions.assertEquals(true, kafkaJsonSchemaDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray()));
    }

    @Test
    public void serializeObjectJsonschemaKey_returnsSerialized() throws Exception {
        JsonSchema jsonSchema = new JsonSchema("{    \"type\": \"object\",    \"properties\": {        \"foo\": {\"type\": \"integer\"},        \"bar\": {\"type\": \"boolean\"}    }}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, jsonSchema);
        int register = this.schemaRegistryClient.register(subjectName, jsonSchema);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", 1);
        objectNode.put("bar", true);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, register, 1, jsonSchema)), objectNode, true).get();
        KafkaJsonSchemaDeserializer kafkaJsonSchemaDeserializer = new KafkaJsonSchemaDeserializer();
        kafkaJsonSchemaDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, true);
        Map map = (Map) kafkaJsonSchemaDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray());
        HashMap hashMap = new HashMap();
        hashMap.put("foo", 1);
        hashMap.put("bar", true);
        Assertions.assertEquals(hashMap, map);
    }

    @Test
    public void serializeArrayJsonschemaKey_returnsSerialized() throws Exception {
        JsonSchema jsonSchema = new JsonSchema("{    \"type\": \"array\",    \"items\": {\"type\": \"integer\"}}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, jsonSchema);
        int register = this.schemaRegistryClient.register(subjectName, jsonSchema);
        ArrayNode arrayNode = new ArrayNode(JsonNodeFactory.instance);
        arrayNode.add(1);
        arrayNode.add(2);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, register, 1, jsonSchema)), arrayNode, true).get();
        KafkaJsonSchemaDeserializer kafkaJsonSchemaDeserializer = new KafkaJsonSchemaDeserializer();
        kafkaJsonSchemaDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, true);
        Assertions.assertEquals(Arrays.asList(1, 2), (List) kafkaJsonSchemaDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray()));
    }

    @Test
    public void serializeNullJsonschemaKeyNullSchema_returnsEmpty() {
        Assertions.assertFalse(this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.empty(), NullNode.getInstance(), true).isPresent());
    }

    @Test
    public void serializeNullJsonschemaKeyNonNullableSchema_returnsEmpty() throws Exception {
        JsonSchema jsonSchema = new JsonSchema("{\"type\": \"integer\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, jsonSchema);
        Assertions.assertFalse(this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, jsonSchema), 1, jsonSchema)), NullNode.getInstance(), true).isPresent());
    }

    @Test(expected = RestConstraintViolationException.class)
    public void serializeNonNullJsonschemaKeyNullSchema_throwsSerializationException() {
        this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.empty(), TextNode.valueOf("foobar"), true);
    }

    @Test(expected = BadRequestException.class)
    public void serializeInvalidJsonschemaKey_throwsBadRequestException() throws Exception {
        JsonSchema jsonSchema = new JsonSchema("{\"type\": \"integer\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, jsonSchema);
        this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, jsonSchema), 1, jsonSchema)), TextNode.valueOf("foobar"), true);
    }

    @Test
    public void serializeStringJsonschemaValue_returnsSerialized() throws Exception {
        JsonSchema jsonSchema = new JsonSchema("{\"type\": \"string\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, jsonSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, jsonSchema), 1, jsonSchema)), TextNode.valueOf("foobar"), false).get();
        KafkaJsonSchemaDeserializer kafkaJsonSchemaDeserializer = new KafkaJsonSchemaDeserializer();
        kafkaJsonSchemaDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, false);
        Assertions.assertEquals("foobar", kafkaJsonSchemaDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray()));
    }

    @Test
    public void serializeIntJsonschemaValue_returnsSerialized() throws Exception {
        JsonSchema jsonSchema = new JsonSchema("{\"type\": \"integer\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, jsonSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, jsonSchema), 1, jsonSchema)), IntNode.valueOf(123), false).get();
        KafkaJsonSchemaDeserializer kafkaJsonSchemaDeserializer = new KafkaJsonSchemaDeserializer();
        kafkaJsonSchemaDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, false);
        Assertions.assertEquals(123, kafkaJsonSchemaDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray()));
    }

    @Test
    public void serializeFloatJsonschemaValue_returnsSerialized() throws Exception {
        JsonSchema jsonSchema = new JsonSchema("{\"type\": \"number\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, jsonSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, jsonSchema), 1, jsonSchema)), FloatNode.valueOf(123.456f), false).get();
        KafkaJsonSchemaDeserializer kafkaJsonSchemaDeserializer = new KafkaJsonSchemaDeserializer();
        kafkaJsonSchemaDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, false);
        Assertions.assertEquals(new BigDecimal("123.456"), kafkaJsonSchemaDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray()));
    }

    @Test
    public void serializeBooleanJsonschemaValue_returnsSerialized() throws Exception {
        JsonSchema jsonSchema = new JsonSchema("{\"type\": \"boolean\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, jsonSchema);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, jsonSchema), 1, jsonSchema)), BooleanNode.valueOf(true), false).get();
        KafkaJsonSchemaDeserializer kafkaJsonSchemaDeserializer = new KafkaJsonSchemaDeserializer();
        kafkaJsonSchemaDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, false);
        Assertions.assertEquals(true, kafkaJsonSchemaDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray()));
    }

    @Test
    public void serializeObjectJsonschemaValue_returnsSerialized() throws Exception {
        JsonSchema jsonSchema = new JsonSchema("{    \"type\": \"object\",    \"properties\": {        \"foo\": {\"type\": \"integer\"},        \"bar\": {\"type\": \"boolean\"}    }}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, jsonSchema);
        int register = this.schemaRegistryClient.register(subjectName, jsonSchema);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", 1);
        objectNode.put("bar", true);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, register, 1, jsonSchema)), objectNode, false).get();
        KafkaJsonSchemaDeserializer kafkaJsonSchemaDeserializer = new KafkaJsonSchemaDeserializer();
        kafkaJsonSchemaDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, false);
        Map map = (Map) kafkaJsonSchemaDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray());
        HashMap hashMap = new HashMap();
        hashMap.put("foo", 1);
        hashMap.put("bar", true);
        Assertions.assertEquals(hashMap, map);
    }

    @Test
    public void serializeArrayJsonschemaValue_returnsSerialized() throws Exception {
        JsonSchema jsonSchema = new JsonSchema("{    \"type\": \"array\",    \"items\": {\"type\": \"integer\"}}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, jsonSchema);
        int register = this.schemaRegistryClient.register(subjectName, jsonSchema);
        ArrayNode arrayNode = new ArrayNode(JsonNodeFactory.instance);
        arrayNode.add(1);
        arrayNode.add(2);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, register, 1, jsonSchema)), arrayNode, false).get();
        KafkaJsonSchemaDeserializer kafkaJsonSchemaDeserializer = new KafkaJsonSchemaDeserializer();
        kafkaJsonSchemaDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, false);
        Assertions.assertEquals(Arrays.asList(1, 2), (List) kafkaJsonSchemaDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray()));
    }

    @Test
    public void serializeNullJsonschemaValueNullSchema_returnsEmpty() {
        Assertions.assertFalse(this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.empty(), NullNode.getInstance(), false).isPresent());
    }

    @Test
    public void serializeNullJsonschemaValueNonNullableSchema_returnsEmpty() throws Exception {
        JsonSchema jsonSchema = new JsonSchema("{\"type\": \"integer\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, jsonSchema);
        Assertions.assertFalse(this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, jsonSchema), 1, jsonSchema)), NullNode.getInstance(), false).isPresent());
    }

    @Test(expected = RestConstraintViolationException.class)
    public void serializeNonNullJsonschemaValueNullSchema_throwsSerializationException() {
        this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.empty(), TextNode.valueOf("foobar"), false);
    }

    @Test(expected = BadRequestException.class)
    public void serializeInvalidJsonschemaValue_throwsBadRequestException() throws Exception {
        JsonSchema jsonSchema = new JsonSchema("{\"type\": \"integer\"}");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, jsonSchema);
        this.recordSerializer.serialize(EmbeddedFormat.JSONSCHEMA, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, jsonSchema), 1, jsonSchema)), TextNode.valueOf("foobar"), false);
    }

    @Test
    public void serializeObjectProtobufKey_returnsSerialized() throws Exception {
        ProtobufSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message ObjectKey { int32 foo = 1; bool bar = 2; }");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, protobufSchema);
        int register = this.schemaRegistryClient.register(subjectName, protobufSchema);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", 1);
        objectNode.put("bar", true);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.PROTOBUF, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, register, 1, protobufSchema)), objectNode, true).get();
        KafkaProtobufDeserializer kafkaProtobufDeserializer = new KafkaProtobufDeserializer();
        kafkaProtobufDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, true);
        Message deserialize = kafkaProtobufDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), 1);
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("bar"), true);
        Assertions.assertEquals(newBuilder.build(), deserialize);
    }

    @Test
    public void serializeNullProtobfuKeyNullSchema_returnsEmpty() {
        Assertions.assertFalse(this.recordSerializer.serialize(EmbeddedFormat.PROTOBUF, TOPIC_NAME, Optional.empty(), NullNode.getInstance(), true).isPresent());
    }

    @Test
    public void serializeNullProtobufKeyNonNullableSchema_returnsEmpty() throws Exception {
        ProtobufSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message NullKey { int32 foo = 1; bool bar = 2; }");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, protobufSchema);
        Assertions.assertFalse(this.recordSerializer.serialize(EmbeddedFormat.PROTOBUF, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, protobufSchema), 1, protobufSchema)), NullNode.getInstance(), true).isPresent());
    }

    @Test(expected = RestConstraintViolationException.class)
    public void serializeNonNullProtobufKeyNullSchema_throwsSerializationException() {
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", 1);
        objectNode.put("bar", true);
        this.recordSerializer.serialize(EmbeddedFormat.PROTOBUF, TOPIC_NAME, Optional.empty(), objectNode, true);
    }

    @Test(expected = BadRequestException.class)
    public void serializeInvalidProtobufKey_throwsBadRequestException() throws Exception {
        ProtobufSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message InvalidKey { int32 foo = 1; bool bar = 2; }");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, true, protobufSchema);
        int register = this.schemaRegistryClient.register(subjectName, protobufSchema);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "bar");
        this.recordSerializer.serialize(EmbeddedFormat.PROTOBUF, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, register, 1, protobufSchema)), objectNode, true);
    }

    @Test
    public void serializeObjectProtobufValue_returnsSerialized() throws Exception {
        ProtobufSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message ObjectValueKey { int32 foo = 1; bool bar = 2; }");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, protobufSchema);
        int register = this.schemaRegistryClient.register(subjectName, protobufSchema);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", 1);
        objectNode.put("bar", true);
        ByteString byteString = (ByteString) this.recordSerializer.serialize(EmbeddedFormat.PROTOBUF, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, register, 1, protobufSchema)), objectNode, false).get();
        KafkaProtobufDeserializer kafkaProtobufDeserializer = new KafkaProtobufDeserializer();
        kafkaProtobufDeserializer.configure(SCHEMA_SERIALIZER_CONFIGS, false);
        Message deserialize = kafkaProtobufDeserializer.deserialize(TOPIC_NAME, byteString.toByteArray());
        DynamicMessage.Builder newBuilder = DynamicMessage.newBuilder(protobufSchema.toDescriptor());
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("foo"), 1);
        newBuilder.setField(protobufSchema.toDescriptor().findFieldByName("bar"), true);
        Assertions.assertEquals(newBuilder.build(), deserialize);
    }

    @Test
    public void serializeNullProtobfuValueNullSchema_returnsEmpty() {
        Assertions.assertFalse(this.recordSerializer.serialize(EmbeddedFormat.PROTOBUF, TOPIC_NAME, Optional.empty(), NullNode.getInstance(), false).isPresent());
    }

    @Test
    public void serializeNullProtobufValueNonNullableSchema_returnsEmpty() throws Exception {
        ProtobufSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message NullValue { int32 foo = 1; bool bar = 2; }");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, protobufSchema);
        Assertions.assertFalse(this.recordSerializer.serialize(EmbeddedFormat.PROTOBUF, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, this.schemaRegistryClient.register(subjectName, protobufSchema), 1, protobufSchema)), NullNode.getInstance(), false).isPresent());
    }

    @Test(expected = RestConstraintViolationException.class)
    public void serializeNonNullProtobufValueNullSchema_throwsSerializationException() {
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", 1);
        objectNode.put("bar", true);
        this.recordSerializer.serialize(EmbeddedFormat.PROTOBUF, TOPIC_NAME, Optional.empty(), objectNode, false);
    }

    @Test(expected = BadRequestException.class)
    public void serializeInvalidProtobufValue_throwsBadRequestException() throws Exception {
        ProtobufSchema protobufSchema = new ProtobufSchema("syntax = \"proto3\"; message InvalidValue { int32 foo = 1; bool bar = 2; }");
        String subjectName = SUBJECT_NAME_STRATEGY.subjectName(TOPIC_NAME, false, protobufSchema);
        int register = this.schemaRegistryClient.register(subjectName, protobufSchema);
        ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
        objectNode.put("foo", "bar");
        this.recordSerializer.serialize(EmbeddedFormat.PROTOBUF, TOPIC_NAME, Optional.of(RegisteredSchema.create(subjectName, register, 1, protobufSchema)), objectNode, false);
    }
}
