package io.confluent.kafka.formatter;

import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/confluent/kafka/formatter/KafkaAvroFormatterTest.class */
public class KafkaAvroFormatterTest {
    private static final String RECORD_SCHEMA_STRING = "{\"namespace\": \"example.avro\",\"type\": \"record\",\"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}";
    private static final String RECORD_KEY_SCHEMA_STRING = "{\"namespace\": \"example.avro\",\"type\": \"record\",\"name\": \"keyRecord\",\"fields\": [{\"name\": \"key_field\", \"type\": \"string\"}]}";
    private static final String RECORD_VALUE_SCHEMA_STRING = "{\"namespace\": \"example.avro\",\"type\": \"record\",\"name\": \"valueRecord\",\"fields\": [{\"name\": \"value_field\", \"type\": \"string\"}]}";
    private Properties props;
    private AvroMessageFormatter formatter;
    private Schema recordSchema = null;
    private Schema intSchema = null;
    private String url = "mock://test";
    private SchemaRegistryClient schemaRegistry = null;

    @Before
    public void setUp() {
        this.props = new Properties();
        this.props.put("schema.registry.url", this.url);
        Schema.Parser parser = new Schema.Parser();
        this.recordSchema = parser.parse(RECORD_SCHEMA_STRING);
        this.intSchema = parser.parse("{\"type\" : \"int\"}");
        this.schemaRegistry = MockSchemaRegistry.getClientForScope("test");
        this.formatter = new AvroMessageFormatter(this.url, (Deserializer) null);
    }

    @After
    public void tearDown() {
        MockSchemaRegistry.dropScope("test");
    }

    @Test
    public void testKafkaAvroValueFormatter() {
        this.formatter.init(this.props);
        byte[] bArr = (byte[]) new AvroMessageReader(this.url, (Schema) null, this.recordSchema, "topic1", false, new BufferedReader(new InputStreamReader(new ByteArrayInputStream("{\"name\":\"myname\"}\n".getBytes()))), false, true, false).readMessage().value();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        PrintStream printStream = new PrintStream(byteArrayOutputStream);
        this.formatter.writeTo(new ConsumerRecord("topic1", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, 0L, 0, bArr.length, (Object) null, bArr), printStream);
        Assert.assertEquals("Input value json should match output value json", "{\"name\":\"myname\"}\n", byteArrayOutputStream.toString());
    }

    @Test
    public void testKafkaAvroKeyValueFormatter() {
        this.props.put("print.key", "true");
        this.formatter.init(this.props);
        ProducerRecord readMessage = new AvroMessageReader(this.url, this.intSchema, this.recordSchema, "topic1", true, new BufferedReader(new InputStreamReader(new ByteArrayInputStream("10\t{\"name\":\"myname\"}\n".getBytes()))), false, true, false).readMessage();
        byte[] bArr = (byte[]) readMessage.key();
        byte[] bArr2 = (byte[]) readMessage.value();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        PrintStream printStream = new PrintStream(byteArrayOutputStream);
        this.formatter.writeTo(new ConsumerRecord("topic1", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, 0L, bArr.length, bArr2.length, bArr, bArr2), printStream);
        Assert.assertEquals("Input key/value json should match output key/value json", "10\t{\"name\":\"myname\"}\n", byteArrayOutputStream.toString());
    }

    @Test
    public void testKafkaAvroValueWithTimestampFormatter() {
        this.props.put("print.timestamp", "true");
        this.formatter.init(this.props);
        TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
        String format = String.format("%s:%d\t%s", timestampType.name, 1000L, "{\"name\":\"myname\"}\n");
        byte[] bArr = (byte[]) new AvroMessageReader(this.url, (Schema) null, this.recordSchema, "topic1", false, new BufferedReader(new InputStreamReader(new ByteArrayInputStream("{\"name\":\"myname\"}\n".getBytes()))), false, true, false).readMessage().value();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        this.formatter.writeTo(new ConsumerRecord("topic1", 0, 200L, 1000L, timestampType, 0L, 0, bArr.length, (Object) null, bArr), new PrintStream(byteArrayOutputStream));
        Assert.assertEquals("Input value json should match output value json", format, byteArrayOutputStream.toString());
    }

    @Test
    public void testInvalidFormat() {
        try {
            new AvroMessageReader(this.url, (Schema) null, this.recordSchema, "topic1", false, new BufferedReader(new InputStreamReader(new ByteArrayInputStream("{\"invalid-field-name\":\"myname\"}\n".getBytes()))), false, true, false).readMessage();
            Assert.fail("Registering an invalid schema should fail");
        } catch (SerializationException e) {
            Assert.assertTrue("The cause of the exception should be avro", e.getCause() instanceof AvroRuntimeException);
        }
    }

    @Test
    public void testStringKey() {
        this.props.put("print.key", "true");
        this.formatter = new AvroMessageFormatter(this.url, new StringDeserializer());
        this.formatter.init(this.props);
        ProducerRecord readMessage = new AvroMessageReader(this.url, (Schema) null, this.recordSchema, "topic1", false, new BufferedReader(new InputStreamReader(new ByteArrayInputStream("{\"name\":\"myname\"}\n".getBytes()))), false, true, false).readMessage();
        byte[] bytes = "TestKey".getBytes();
        byte[] bArr = (byte[]) readMessage.value();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        this.formatter.writeTo(new ConsumerRecord("topic1", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, 0L, bytes.length, bArr.length, bytes, bArr), new PrintStream(byteArrayOutputStream));
        Assert.assertEquals("Input key/value json should match output key/value json", "TestKey\t{\"name\":\"myname\"}\n", byteArrayOutputStream.toString());
    }

    @Test
    public void testStringKeyWithTimestamp() {
        this.props.put("print.key", "true");
        this.props.put("print.timestamp", "true");
        this.formatter = new AvroMessageFormatter(this.url, new StringDeserializer());
        this.formatter.init(this.props);
        TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
        String format = String.format("%s:%d\tTestKey\t%s", timestampType.name, 1000L, "{\"name\":\"myname\"}\n");
        ProducerRecord readMessage = new AvroMessageReader(this.url, (Schema) null, this.recordSchema, "topic1", false, new BufferedReader(new InputStreamReader(new ByteArrayInputStream("{\"name\":\"myname\"}\n".getBytes()))), false, true, false).readMessage();
        byte[] bytes = "TestKey".getBytes();
        byte[] bArr = (byte[]) readMessage.value();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        this.formatter.writeTo(new ConsumerRecord("topic1", 0, 200L, 1000L, timestampType, 0L, bytes.length, bArr.length, bytes, bArr), new PrintStream(byteArrayOutputStream));
        Assert.assertEquals("Input key/value json should match output key/value json", format, byteArrayOutputStream.toString());
    }

    @Test
    public void testKafkaAvroValueUsingLatestVersion() throws Exception {
        this.formatter.init(this.props);
        this.schemaRegistry.register("topic1-value", new AvroSchema(this.recordSchema));
        byte[] bArr = (byte[]) new AvroMessageReader(this.url, (Schema) null, this.recordSchema, "topic1", false, new BufferedReader(new InputStreamReader(new ByteArrayInputStream("{\"name\":\"myname\"}\n".getBytes()))), false, false, true).readMessage().value();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        PrintStream printStream = new PrintStream(byteArrayOutputStream);
        this.formatter.writeTo(new ConsumerRecord("topic1", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, 0L, 0, bArr.length, (Object) null, bArr), printStream);
        Assert.assertEquals("Input value json should match output value json", "{\"name\":\"myname\"}\n", byteArrayOutputStream.toString());
    }

    @Test
    public void testUsingTopicRecordNameStrategy() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("topic", "mytopic");
        hashMap.put("schema.registry.url", "mock://foo");
        hashMap.put("value.subject.name.strategy", TopicRecordNameStrategy.class.getName());
        hashMap.put("auto.register.schemas", "false");
        hashMap.put("value.schema", RECORD_SCHEMA_STRING);
        AvroMessageFormatter avroMessageFormatter = new AvroMessageFormatter();
        avroMessageFormatter.configure(hashMap);
        SchemaRegistryClient clientForScope = MockSchemaRegistry.getClientForScope("foo");
        clientForScope.register("mytopic-value", new AvroSchema(this.recordSchema));
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream("{\"name\":\"myname\"}\n".getBytes());
        AvroMessageReader avroMessageReader = new AvroMessageReader();
        Properties properties = new Properties();
        properties.putAll(hashMap);
        avroMessageReader.init(byteArrayInputStream, properties);
        try {
            avroMessageReader.readMessage();
            Assert.fail("Expected exception was not thrown. Exception should have been thrown due to schema not present in the mock schema registry with the TopicRecordNameStrategy, and auto-register disabled.");
        } catch (SerializationException e) {
            Assert.assertTrue(e.getMessage().contains("Error retrieving Avro schema"));
        }
        clientForScope.register("mytopic-" + this.recordSchema.getFullName(), new AvroSchema(this.recordSchema));
        byteArrayInputStream.reset();
        byte[] bArr = (byte[]) avroMessageReader.readMessage().value();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        avroMessageFormatter.writeTo(new ConsumerRecord("topic1", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, 0L, 0, bArr.length, (Object) null, bArr), new PrintStream(byteArrayOutputStream));
        Assert.assertEquals("Input value json should match output value json", "{\"name\":\"myname\"}\n", byteArrayOutputStream.toString());
    }

    @Test
    public void testUsingSubjectNameStrategy() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("topic", "mytopic");
        hashMap.put("schema.registry.url", "mock://foo");
        hashMap.put("auto.register.schemas", "false");
        hashMap.put("value.schema", RECORD_VALUE_SCHEMA_STRING);
        hashMap.put("key.schema", RECORD_KEY_SCHEMA_STRING);
        hashMap.put("parse.key", "true");
        hashMap.put("print.key", "true");
        AvroMessageFormatter avroMessageFormatter = new AvroMessageFormatter();
        avroMessageFormatter.configure(hashMap);
        SchemaRegistryClient clientForScope = MockSchemaRegistry.getClientForScope("foo");
        Schema.Parser parser = new Schema.Parser();
        Schema parse = parser.parse(RECORD_KEY_SCHEMA_STRING);
        Schema parse2 = parser.parse(RECORD_VALUE_SCHEMA_STRING);
        clientForScope.register("mytopic-key", new AvroSchema(parse));
        clientForScope.register("mytopic-value", new AvroSchema(parse2));
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream("{\"key_field\":\"1\"}\t{\"value_field\":\"1\"}\n".getBytes());
        AvroMessageReader avroMessageReader = new AvroMessageReader();
        Properties properties = new Properties();
        properties.putAll(hashMap);
        avroMessageReader.init(byteArrayInputStream, properties);
        ProducerRecord readMessage = avroMessageReader.readMessage();
        byte[] bArr = (byte[]) readMessage.key();
        byte[] bArr2 = (byte[]) readMessage.value();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        avroMessageFormatter.writeTo(new ConsumerRecord("mytopic", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, 0L, bArr.length, bArr2.length, bArr, bArr2), new PrintStream(byteArrayOutputStream));
        Assert.assertEquals("Input value json should match output value json", "{\"key_field\":\"1\"}\t{\"value_field\":\"1\"}\n", byteArrayOutputStream.toString());
    }

    @Test
    public void testUsingHeaders() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("topic", "mytopic");
        hashMap.put("schema.registry.url", "mock://foo");
        hashMap.put("auto.register.schemas", "false");
        hashMap.put("value.schema", RECORD_VALUE_SCHEMA_STRING);
        hashMap.put("key.schema", RECORD_KEY_SCHEMA_STRING);
        hashMap.put("parse.key", "true");
        hashMap.put("parse.headers", "true");
        hashMap.put("print.key", "true");
        hashMap.put("print.headers", "true");
        hashMap.put("headers.deserializer", StringDeserializer.class.getName());
        AvroMessageFormatter avroMessageFormatter = new AvroMessageFormatter();
        avroMessageFormatter.configure(hashMap);
        SchemaRegistryClient clientForScope = MockSchemaRegistry.getClientForScope("foo");
        Schema.Parser parser = new Schema.Parser();
        Schema parse = parser.parse(RECORD_KEY_SCHEMA_STRING);
        Schema parse2 = parser.parse(RECORD_VALUE_SCHEMA_STRING);
        clientForScope.register("mytopic-key", new AvroSchema(parse));
        clientForScope.register("mytopic-value", new AvroSchema(parse2));
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream("headerKey0:headerValue0,headerKey1:headerValue\t{\"key_field\":\"1\"}\t{\"value_field\":\"1\"}\n".getBytes());
        AvroMessageReader avroMessageReader = new AvroMessageReader();
        Properties properties = new Properties();
        properties.putAll(hashMap);
        avroMessageReader.init(byteArrayInputStream, properties);
        ProducerRecord readMessage = avroMessageReader.readMessage();
        byte[] bArr = (byte[]) readMessage.key();
        byte[] bArr2 = (byte[]) readMessage.value();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        avroMessageFormatter.writeTo(new ConsumerRecord("mytopic", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, bArr.length, bArr2.length, bArr, bArr2, readMessage.headers(), Optional.empty()), new PrintStream(byteArrayOutputStream));
        Assert.assertEquals("Input value should match output value", "headerKey0:headerValue0,headerKey1:headerValue\t{\"key_field\":\"1\"}\t{\"value_field\":\"1\"}\n", byteArrayOutputStream.toString());
    }

    @Test
    public void testUsingNull() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("topic", "mytopic");
        hashMap.put("schema.registry.url", "mock://foo");
        hashMap.put("auto.register.schemas", "false");
        hashMap.put("value.schema", RECORD_VALUE_SCHEMA_STRING);
        hashMap.put("key.schema", RECORD_KEY_SCHEMA_STRING);
        hashMap.put("parse.key", "true");
        hashMap.put("parse.headers", "true");
        hashMap.put("null.marker", "<NULL>");
        hashMap.put("print.key", "true");
        hashMap.put("print.headers", "true");
        hashMap.put("null.literal", "<NULL>");
        hashMap.put("headers.deserializer", StringDeserializer.class.getName());
        AvroMessageFormatter avroMessageFormatter = new AvroMessageFormatter();
        avroMessageFormatter.configure(hashMap);
        SchemaRegistryClient clientForScope = MockSchemaRegistry.getClientForScope("foo");
        Schema.Parser parser = new Schema.Parser();
        Schema parse = parser.parse(RECORD_KEY_SCHEMA_STRING);
        Schema parse2 = parser.parse(RECORD_VALUE_SCHEMA_STRING);
        clientForScope.register("mytopic-key", new AvroSchema(parse));
        clientForScope.register("mytopic-value", new AvroSchema(parse2));
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream("headerKey0:<NULL>,headerKey1:<NULL>\t<NULL>\t<NULL>\n".getBytes());
        AvroMessageReader avroMessageReader = new AvroMessageReader();
        Properties properties = new Properties();
        properties.putAll(hashMap);
        avroMessageReader.init(byteArrayInputStream, properties);
        ProducerRecord readMessage = avroMessageReader.readMessage();
        byte[] bArr = (byte[]) readMessage.key();
        byte[] bArr2 = (byte[]) readMessage.value();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        avroMessageFormatter.writeTo(new ConsumerRecord("mytopic", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, 0, 0, bArr, bArr2, readMessage.headers(), Optional.empty()), new PrintStream(byteArrayOutputStream));
        Assert.assertEquals("Input value should match output value", "headerKey0:<NULL>,headerKey1:<NULL>\t<NULL>\t<NULL>\n", byteArrayOutputStream.toString());
    }
}
