package org.apache.beam.sdk.io.kafka;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.class */
public class KafkaRecordCoderTest {
    @Test
    public void testCoderIsSerializableWithWellKnownCoderType() {
        CoderProperties.coderSerializable(KafkaRecordCoder.of(GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE));
    }

    @Test
    public void testKafkaRecordSerializableWithHeaders() throws IOException {
        RecordHeaders recordHeaders = new RecordHeaders();
        recordHeaders.add("headerKey", "headerVal".getBytes(StandardCharsets.UTF_8));
        verifySerialization(recordHeaders);
    }

    @Test
    public void testKafkaRecordSerializableWithoutHeaders() throws IOException {
        verifySerialization(new ConsumerRecord("", 0, 0L, "", "").headers());
    }

    private void verifySerialization(Headers headers) throws IOException {
        KafkaRecord kafkaRecord = new KafkaRecord("topic", 0, 0L, 0L, KafkaTimestampType.CREATE_TIME, headers, "key", "value");
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        KafkaRecordCoder of = KafkaRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
        of.encode(kafkaRecord, byteArrayOutputStream);
        Assert.assertEquals(kafkaRecord, of.decode(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())));
    }
}
