/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.NestedPayloadKafkaTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.Schemas;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaTimestampType;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ListMultimap;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(value=JUnit4.class)
public class NestedPayloadKafkaTableTest {
    private static final String TOPIC = "mytopic";
    private static final Schema FULL_WRITE_SCHEMA = Schema.builder().addByteArrayField("message_key").addField("event_timestamp", Schema.FieldType.DATETIME.withNullable(true)).addArrayField("headers", Schema.FieldType.row((Schema)Schemas.HEADERS_ENTRY_SCHEMA)).addByteArrayField("payload").build();
    private static final Schema FULL_READ_SCHEMA = Schema.builder().addByteArrayField("message_key").addDateTimeField("event_timestamp").addArrayField("headers", Schema.FieldType.row((Schema)Schemas.HEADERS_ENTRY_SCHEMA)).addByteArrayField("payload").build();
    @Mock
    public PayloadSerializer serializer;

    @Before
    public void setUp() {
        MockitoAnnotations.openMocks((Object)this);
    }

    private NestedPayloadKafkaTable newTable(Schema schema, Optional<PayloadSerializer> serializer) {
        return new NestedPayloadKafkaTable(schema, "abc.bootstrap", (List)ImmutableList.of((Object)TOPIC), serializer);
    }

    @Test
    public void constructionFailures() {
        Assert.assertThrows(IllegalArgumentException.class, () -> this.newTable(Schema.builder().addByteArrayField("payload").build(), Optional.empty()));
        Assert.assertThrows(IllegalArgumentException.class, () -> this.newTable(Schema.builder().addRowField("payload", Schema.builder().addStringField("abc").build()).addField("headers", Schemas.HEADERS_FIELD_TYPE).build(), Optional.empty()));
        Assert.assertThrows(IllegalArgumentException.class, () -> this.newTable(Schema.builder().addByteArrayField("payload").addField("headers", Schemas.HEADERS_FIELD_TYPE).build(), Optional.of(this.serializer)));
        Assert.assertThrows(IllegalArgumentException.class, () -> this.newTable(Schema.builder().addByteArrayField("payload").addField("headers", Schemas.HEADERS_FIELD_TYPE).addBooleanField("bad").build(), Optional.empty()));
        Assert.assertThrows(IllegalArgumentException.class, () -> this.newTable(Schema.builder().addByteArrayField("payload").addField("headers", Schemas.HEADERS_FIELD_TYPE).addBooleanField("event_timestamp").build(), Optional.empty()));
    }

    private static KafkaRecord<byte[], byte[]> readRecord(byte[] key, byte[] value, long timestamp, ListMultimap<String, byte[]> attributes) {
        RecordHeaders headers = new RecordHeaders();
        attributes.forEach((arg_0, arg_1) -> ((Headers)headers).add(arg_0, arg_1));
        return new KafkaRecord(TOPIC, 0, 0L, timestamp, KafkaTimestampType.LOG_APPEND_TIME, (Headers)headers, (Object)key, (Object)value);
    }

    @Test
    public void recordToRowFailures() {
        Schema payloadSchema = Schema.builder().addStringField("def").build();
        NestedPayloadKafkaTable table = this.newTable(Schema.builder().addRowField("payload", payloadSchema).addField("headers", Schemas.HEADERS_FIELD_TYPE).build(), Optional.of(this.serializer));
        ((PayloadSerializer)Mockito.doThrow((Throwable[])new Throwable[]{new IllegalArgumentException("")}).when((Object)this.serializer)).deserialize((byte[])ArgumentMatchers.any());
        Assert.assertThrows(IllegalArgumentException.class, () -> table.transformInput(NestedPayloadKafkaTableTest.readRecord(new byte[0], "abc".getBytes(StandardCharsets.UTF_8), 123L, (ListMultimap<String, byte[]>)ImmutableListMultimap.of())));
        NestedPayloadKafkaTable table2 = this.newTable(Schema.builder().addByteArrayField("payload").addField("headers", Schemas.HEADERS_FIELD_TYPE).build(), Optional.empty());
        Assert.assertThrows(IllegalArgumentException.class, () -> table2.transformInput(new KafkaRecord(TOPIC, 0, 0L, 0L, KafkaTimestampType.LOG_APPEND_TIME, null, (Object)new byte[0], (Object)new byte[0])));
    }

    @Test
    public void rowToRecordFailures() {
        Schema payloadSchema = Schema.builder().addStringField("def").build();
        Schema schema = Schema.builder().addRowField("payload", payloadSchema).addField("headers", Schemas.HEADERS_FIELD_TYPE.withNullable(true)).build();
        NestedPayloadKafkaTable table = this.newTable(schema, Optional.of(this.serializer));
        Schema badRowSchema = Schema.builder().addStringField("xxx").build();
        Row badRow = Row.withSchema((Schema)badRowSchema).attachValues(new Object[]{Row.withSchema((Schema)badRowSchema).attachValues(new Object[]{"abc"})});
        Assert.assertThrows(IllegalArgumentException.class, () -> table.transformOutput(badRow));
        Row goodRow = Row.withSchema((Schema)schema).withFieldValue("payload", (Object)Row.withSchema((Schema)payloadSchema).withFieldValue("def", (Object)"abc").build()).build();
        ((PayloadSerializer)Mockito.doThrow((Throwable[])new Throwable[]{new IllegalArgumentException("")}).when((Object)this.serializer)).serialize((Row)ArgumentMatchers.any());
        Assert.assertThrows(IllegalArgumentException.class, () -> table.transformOutput(goodRow));
    }

    @Test
    public void reorderRowToRecord() {
        Schema schema = Schema.builder().addField("headers", Schemas.HEADERS_FIELD_TYPE).addByteArrayField("payload").build();
        Schema rowSchema = Schema.builder().addByteArrayField("payload").addField("headers", Schemas.HEADERS_FIELD_TYPE).build();
        NestedPayloadKafkaTable table = this.newTable(schema, Optional.empty());
        Row row = Row.withSchema((Schema)rowSchema).attachValues(new Object[]{"abc".getBytes(StandardCharsets.UTF_8), ImmutableList.of()});
        ProducerRecord output = table.transformOutput(row);
        Assert.assertEquals((Object)"abc", (Object)new String((byte[])output.value(), StandardCharsets.UTF_8));
        Assert.assertEquals((long)0L, (long)output.headers().toArray().length);
    }

    @Test
    public void fullRowToRecord() {
        NestedPayloadKafkaTable table = this.newTable(FULL_WRITE_SCHEMA, Optional.empty());
        Instant now = Instant.now();
        Row row = Row.withSchema((Schema)FULL_WRITE_SCHEMA).withFieldValue("message_key", (Object)"val1".getBytes(StandardCharsets.UTF_8)).withFieldValue("payload", (Object)"val2".getBytes(StandardCharsets.UTF_8)).withFieldValue("event_timestamp", (Object)now).withFieldValue("headers", (Object)ImmutableList.of((Object)Row.withSchema((Schema)Schemas.HEADERS_ENTRY_SCHEMA).attachValues(new Object[]{"key1", ImmutableList.of((Object)"attr1".getBytes(StandardCharsets.UTF_8), (Object)"attr2".getBytes(StandardCharsets.UTF_8))}), (Object)Row.withSchema((Schema)Schemas.HEADERS_ENTRY_SCHEMA).attachValues(new Object[]{"key2", ImmutableList.of((Object)"attr3".getBytes(StandardCharsets.UTF_8))}))).build();
        ProducerRecord result = table.transformOutput(row);
        Assert.assertEquals((Object)"val1", (Object)new String((byte[])result.key(), StandardCharsets.UTF_8));
        Assert.assertEquals((Object)"val2", (Object)new String((byte[])result.value(), StandardCharsets.UTF_8));
        Assert.assertEquals((long)now.getMillis(), (long)result.timestamp());
        ImmutableList key1Headers = ImmutableList.copyOf((Iterable)result.headers().headers("key1"));
        ImmutableList key2Headers = ImmutableList.copyOf((Iterable)result.headers().headers("key2"));
        Assert.assertEquals((long)2L, (long)key1Headers.size());
        Assert.assertEquals((long)1L, (long)key2Headers.size());
        Assert.assertEquals((Object)"attr3", (Object)new String(((Header)key2Headers.get(0)).value(), StandardCharsets.UTF_8));
    }

    @Test
    public void fullRecordToRow() {
        NestedPayloadKafkaTable table = this.newTable(FULL_READ_SCHEMA, Optional.empty());
        Instant event = Instant.now();
        KafkaRecord<byte[], byte[]> record = NestedPayloadKafkaTableTest.readRecord("key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8), event.getMillis(), (ListMultimap<String, byte[]>)ImmutableListMultimap.of((Object)"key1", (Object)"attr1".getBytes(StandardCharsets.UTF_8), (Object)"key1", (Object)"attr2".getBytes(StandardCharsets.UTF_8), (Object)"key2", (Object)"attr3".getBytes(StandardCharsets.UTF_8)));
        Row expected = Row.withSchema((Schema)FULL_READ_SCHEMA).withFieldValue("message_key", (Object)"key".getBytes(StandardCharsets.UTF_8)).withFieldValue("payload", (Object)"value".getBytes(StandardCharsets.UTF_8)).withFieldValue("event_timestamp", (Object)event).withFieldValue("headers", (Object)ImmutableList.of((Object)Row.withSchema((Schema)Schemas.HEADERS_ENTRY_SCHEMA).attachValues(new Object[]{"key1", ImmutableList.of((Object)"attr1".getBytes(StandardCharsets.UTF_8), (Object)"attr2".getBytes(StandardCharsets.UTF_8))}), (Object)Row.withSchema((Schema)Schemas.HEADERS_ENTRY_SCHEMA).attachValues(new Object[]{"key2", ImmutableList.of((Object)"attr3".getBytes(StandardCharsets.UTF_8))}))).build();
        Assert.assertEquals((Object)expected, (Object)table.transformInput(record));
    }
}

