package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaTimestampType;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableListMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ListMultimap;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/NestedPayloadKafkaTableTest.class */
public class NestedPayloadKafkaTableTest {
    private static final String TOPIC = "mytopic";
    private static final Schema FULL_WRITE_SCHEMA = Schema.builder().addByteArrayField("message_key").addField("event_timestamp", Schema.FieldType.DATETIME.withNullable(true)).addArrayField("headers", Schema.FieldType.row(Schemas.HEADERS_ENTRY_SCHEMA)).addByteArrayField("payload").build();
    private static final Schema FULL_READ_SCHEMA = Schema.builder().addByteArrayField("message_key").addDateTimeField("event_timestamp").addArrayField("headers", Schema.FieldType.row(Schemas.HEADERS_ENTRY_SCHEMA)).addByteArrayField("payload").build();

    @Mock
    public PayloadSerializer serializer;

    @Before
    public void setUp() {
        MockitoAnnotations.openMocks(this);
    }

    private NestedPayloadKafkaTable newTable(Schema schema, Optional<PayloadSerializer> optional) {
        return new NestedPayloadKafkaTable(schema, "abc.bootstrap", ImmutableList.of(TOPIC), optional);
    }

    @Test
    public void constructionFailures() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            newTable(Schema.builder().addByteArrayField("payload").build(), Optional.empty());
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            newTable(Schema.builder().addRowField("payload", Schema.builder().addStringField("abc").build()).addField("headers", Schemas.HEADERS_FIELD_TYPE).build(), Optional.empty());
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            newTable(Schema.builder().addByteArrayField("payload").addField("headers", Schemas.HEADERS_FIELD_TYPE).build(), Optional.of(this.serializer));
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            newTable(Schema.builder().addByteArrayField("payload").addField("headers", Schemas.HEADERS_FIELD_TYPE).addBooleanField("bad").build(), Optional.empty());
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            newTable(Schema.builder().addByteArrayField("payload").addField("headers", Schemas.HEADERS_FIELD_TYPE).addBooleanField("event_timestamp").build(), Optional.empty());
        });
    }

    private static KafkaRecord<byte[], byte[]> readRecord(byte[] bArr, byte[] bArr2, long j, ListMultimap<String, byte[]> listMultimap) {
        RecordHeaders recordHeaders = new RecordHeaders();
        Objects.requireNonNull(recordHeaders);
        listMultimap.forEach(recordHeaders::add);
        return new KafkaRecord<>(TOPIC, 0, 0L, j, KafkaTimestampType.LOG_APPEND_TIME, recordHeaders, bArr, bArr2);
    }

    @Test
    public void recordToRowFailures() {
        NestedPayloadKafkaTable newTable = newTable(Schema.builder().addRowField("payload", Schema.builder().addStringField("def").build()).addField("headers", Schemas.HEADERS_FIELD_TYPE).build(), Optional.of(this.serializer));
        ((PayloadSerializer) Mockito.doThrow(new Throwable[]{new IllegalArgumentException("")}).when(this.serializer)).deserialize((byte[]) ArgumentMatchers.any());
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            newTable.transformInput(readRecord(new byte[0], "abc".getBytes(StandardCharsets.UTF_8), 123L, ImmutableListMultimap.of()));
        });
        NestedPayloadKafkaTable newTable2 = newTable(Schema.builder().addByteArrayField("payload").addField("headers", Schemas.HEADERS_FIELD_TYPE).build(), Optional.empty());
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            newTable2.transformInput(new KafkaRecord(TOPIC, 0, 0L, 0L, KafkaTimestampType.LOG_APPEND_TIME, (Headers) null, new byte[0], new byte[0]));
        });
    }

    @Test
    public void rowToRecordFailures() {
        Schema build = Schema.builder().addStringField("def").build();
        Schema build2 = Schema.builder().addRowField("payload", build).addField("headers", Schemas.HEADERS_FIELD_TYPE.withNullable(true)).build();
        NestedPayloadKafkaTable newTable = newTable(build2, Optional.of(this.serializer));
        Schema build3 = Schema.builder().addStringField("xxx").build();
        Row attachValues = Row.withSchema(build3).attachValues(new Object[]{Row.withSchema(build3).attachValues(new Object[]{"abc"})});
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            newTable.transformOutput(attachValues);
        });
        Row build4 = Row.withSchema(build2).withFieldValue("payload", Row.withSchema(build).withFieldValue("def", "abc").build()).build();
        ((PayloadSerializer) Mockito.doThrow(new Throwable[]{new IllegalArgumentException("")}).when(this.serializer)).serialize((Row) ArgumentMatchers.any());
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            newTable.transformOutput(build4);
        });
    }

    @Test
    public void reorderRowToRecord() {
        Assert.assertEquals("abc", new String((byte[]) newTable(Schema.builder().addField("headers", Schemas.HEADERS_FIELD_TYPE).addByteArrayField("payload").build(), Optional.empty()).transformOutput(Row.withSchema(Schema.builder().addByteArrayField("payload").addField("headers", Schemas.HEADERS_FIELD_TYPE).build()).attachValues(new Object[]{"abc".getBytes(StandardCharsets.UTF_8), ImmutableList.of()})).value(), StandardCharsets.UTF_8));
        Assert.assertEquals(0L, r0.headers().toArray().length);
    }

    @Test
    public void fullRowToRecord() {
        NestedPayloadKafkaTable newTable = newTable(FULL_WRITE_SCHEMA, Optional.empty());
        Instant now = Instant.now();
        ProducerRecord transformOutput = newTable.transformOutput(Row.withSchema(FULL_WRITE_SCHEMA).withFieldValue("message_key", "val1".getBytes(StandardCharsets.UTF_8)).withFieldValue("payload", "val2".getBytes(StandardCharsets.UTF_8)).withFieldValue("event_timestamp", now).withFieldValue("headers", ImmutableList.of(Row.withSchema(Schemas.HEADERS_ENTRY_SCHEMA).attachValues(new Object[]{"key1", ImmutableList.of("attr1".getBytes(StandardCharsets.UTF_8), "attr2".getBytes(StandardCharsets.UTF_8))}), Row.withSchema(Schemas.HEADERS_ENTRY_SCHEMA).attachValues(new Object[]{"key2", ImmutableList.of("attr3".getBytes(StandardCharsets.UTF_8))}))).build());
        Assert.assertEquals("val1", new String((byte[]) transformOutput.key(), StandardCharsets.UTF_8));
        Assert.assertEquals("val2", new String((byte[]) transformOutput.value(), StandardCharsets.UTF_8));
        Assert.assertEquals(now.getMillis(), transformOutput.timestamp().longValue());
        ImmutableList copyOf = ImmutableList.copyOf(transformOutput.headers().headers("key1"));
        ImmutableList copyOf2 = ImmutableList.copyOf(transformOutput.headers().headers("key2"));
        Assert.assertEquals(2L, copyOf.size());
        Assert.assertEquals(1L, copyOf2.size());
        Assert.assertEquals("attr3", new String(((Header) copyOf2.get(0)).value(), StandardCharsets.UTF_8));
    }

    @Test
    public void fullRecordToRow() {
        NestedPayloadKafkaTable newTable = newTable(FULL_READ_SCHEMA, Optional.empty());
        Instant now = Instant.now();
        Assert.assertEquals(Row.withSchema(FULL_READ_SCHEMA).withFieldValue("message_key", "key".getBytes(StandardCharsets.UTF_8)).withFieldValue("payload", "value".getBytes(StandardCharsets.UTF_8)).withFieldValue("event_timestamp", now).withFieldValue("headers", ImmutableList.of(Row.withSchema(Schemas.HEADERS_ENTRY_SCHEMA).attachValues(new Object[]{"key1", ImmutableList.of("attr1".getBytes(StandardCharsets.UTF_8), "attr2".getBytes(StandardCharsets.UTF_8))}), Row.withSchema(Schemas.HEADERS_ENTRY_SCHEMA).attachValues(new Object[]{"key2", ImmutableList.of("attr3".getBytes(StandardCharsets.UTF_8))}))).build(), newTable.transformInput(readRecord("key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8), now.getMillis(), ImmutableListMultimap.of("key1", "attr1".getBytes(StandardCharsets.UTF_8), "key1", "attr2".getBytes(StandardCharsets.UTF_8), "key2", "attr3".getBytes(StandardCharsets.UTF_8)))));
    }
}
