/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.pubsublite;

import com.google.cloud.pubsublite.proto.AttributeValues;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.extensions.sql.meta.provider.pubsublite.RowHandler;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(value=JUnit4.class)
public class RowHandlerTest {
    private static final Schema FULL_WRITE_SCHEMA = Schema.builder().addByteArrayField("message_key").addField("event_timestamp", Schema.FieldType.DATETIME.withNullable(true)).addArrayField("attributes", Schema.FieldType.row((Schema)RowHandler.ATTRIBUTES_ENTRY_SCHEMA)).addByteArrayField("payload").build();
    private static final Schema FULL_READ_SCHEMA = Schema.builder().addByteArrayField("message_key").addField("event_timestamp", Schema.FieldType.DATETIME.withNullable(true)).addArrayField("attributes", Schema.FieldType.row((Schema)RowHandler.ATTRIBUTES_ENTRY_SCHEMA)).addByteArrayField("payload").addDateTimeField("publish_timestamp").build();
    @Mock
    public PayloadSerializer serializer;

    @Before
    public void setUp() {
        MockitoAnnotations.openMocks((Object)this);
    }

    @Test
    public void constructionFailures() {
        Assert.assertThrows(IllegalArgumentException.class, () -> new RowHandler(Schema.builder().addRowField("payload", Schema.builder().addStringField("abc").build()).build()));
        Assert.assertThrows(IllegalArgumentException.class, () -> new RowHandler(Schema.builder().addByteArrayField("payload").build(), this.serializer));
    }

    @Test
    public void messageToRowFailures() {
        Schema payloadSchema = Schema.builder().addStringField("def").build();
        RowHandler rowHandler = new RowHandler(Schema.builder().addRowField("payload", payloadSchema).build(), this.serializer);
        ((PayloadSerializer)Mockito.doThrow((Throwable[])new Throwable[]{new IllegalArgumentException("")}).when((Object)this.serializer)).deserialize((byte[])ArgumentMatchers.any());
        Assert.assertThrows(IllegalArgumentException.class, () -> rowHandler.messageToRow(SequencedMessage.getDefaultInstance()));
        RowHandler rowHandler2 = new RowHandler(Schema.builder().addByteArrayField("payload").addDateTimeField("event_timestamp").build());
        Assert.assertThrows(IllegalArgumentException.class, () -> rowHandler2.messageToRow(SequencedMessage.getDefaultInstance()));
    }

    @Test
    public void rowToMessageFailures() {
        Schema payloadSchema = Schema.builder().addStringField("def").build();
        Schema schema = Schema.builder().addRowField("payload", payloadSchema).build();
        RowHandler rowHandler = new RowHandler(schema, this.serializer);
        Schema badRowSchema = Schema.builder().addStringField("xxx").build();
        Row badRow = Row.withSchema((Schema)badRowSchema).attachValues(new Object[]{Row.withSchema((Schema)badRowSchema).attachValues(new Object[]{"abc"})});
        Assert.assertThrows(IllegalArgumentException.class, () -> rowHandler.rowToMessage(badRow));
        Row goodRow = Row.withSchema((Schema)schema).addValue((Object)Row.withSchema((Schema)payloadSchema).attachValues(new Object[]{"abc"})).build();
        ((PayloadSerializer)Mockito.doThrow((Throwable[])new Throwable[]{new IllegalArgumentException("")}).when((Object)this.serializer)).serialize((Row)ArgumentMatchers.any());
        Assert.assertThrows(IllegalArgumentException.class, () -> rowHandler.rowToMessage(goodRow));
    }

    @Test
    public void reorderRowToMessage() {
        Schema schema = Schema.builder().addByteArrayField("message_key").addByteArrayField("payload").build();
        Schema rowSchema = Schema.builder().addByteArrayField("payload").addByteArrayField("message_key").build();
        RowHandler rowHandler = new RowHandler(schema);
        Row row = Row.withSchema((Schema)rowSchema).attachValues(new Object[]{"abc".getBytes(StandardCharsets.UTF_8), "def".getBytes(StandardCharsets.UTF_8)});
        PubSubMessage expected = PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8((String)"abc")).setKey(ByteString.copyFromUtf8((String)"def")).build();
        Assert.assertEquals((Object)expected, (Object)rowHandler.rowToMessage(row));
    }

    @Test
    public void fullRowToMessage() {
        RowHandler rowHandler = new RowHandler(FULL_WRITE_SCHEMA);
        Instant now = Instant.now();
        Row row = Row.withSchema((Schema)FULL_WRITE_SCHEMA).withFieldValue("message_key", (Object)"val1".getBytes(StandardCharsets.UTF_8)).withFieldValue("payload", (Object)"val2".getBytes(StandardCharsets.UTF_8)).withFieldValue("event_timestamp", (Object)now).withFieldValue("attributes", (Object)ImmutableList.of((Object)Row.withSchema((Schema)RowHandler.ATTRIBUTES_ENTRY_SCHEMA).attachValues(new Object[]{"key1", ImmutableList.of((Object)"attr1".getBytes(StandardCharsets.UTF_8), (Object)"attr2".getBytes(StandardCharsets.UTF_8))}), (Object)Row.withSchema((Schema)RowHandler.ATTRIBUTES_ENTRY_SCHEMA).attachValues(new Object[]{"key2", ImmutableList.of((Object)"attr3".getBytes(StandardCharsets.UTF_8))}))).build();
        PubSubMessage expected = PubSubMessage.newBuilder().setKey(ByteString.copyFromUtf8((String)"val1")).setData(ByteString.copyFromUtf8((String)"val2")).setEventTime(Timestamps.fromMillis((long)now.getMillis())).putAttributes("key1", AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8((String)"attr1")).addValues(ByteString.copyFromUtf8((String)"attr2")).build()).putAttributes("key2", AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8((String)"attr3")).build()).build();
        Assert.assertEquals((Object)expected, (Object)rowHandler.rowToMessage(row));
    }

    @Test
    public void fullMessageToRow() {
        RowHandler rowHandler = new RowHandler(FULL_READ_SCHEMA);
        Instant event = Instant.now();
        Instant publish = Instant.now();
        PubSubMessage userMessage = PubSubMessage.newBuilder().setKey(ByteString.copyFromUtf8((String)"val1")).setData(ByteString.copyFromUtf8((String)"val2")).setEventTime(Timestamps.fromMillis((long)event.getMillis())).putAttributes("key1", AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8((String)"attr1")).addValues(ByteString.copyFromUtf8((String)"attr2")).build()).putAttributes("key2", AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8((String)"attr3")).build()).build();
        SequencedMessage sequencedMessage = SequencedMessage.newBuilder().setMessage(userMessage).setPublishTime(Timestamps.fromMillis((long)publish.getMillis())).build();
        Row expected = Row.withSchema((Schema)FULL_READ_SCHEMA).withFieldValue("message_key", (Object)"val1".getBytes(StandardCharsets.UTF_8)).withFieldValue("payload", (Object)"val2".getBytes(StandardCharsets.UTF_8)).withFieldValue("event_timestamp", (Object)event).withFieldValue("publish_timestamp", (Object)publish).withFieldValue("attributes", (Object)ImmutableList.of((Object)Row.withSchema((Schema)RowHandler.ATTRIBUTES_ENTRY_SCHEMA).attachValues(new Object[]{"key1", ImmutableList.of((Object)"attr1".getBytes(StandardCharsets.UTF_8), (Object)"attr2".getBytes(StandardCharsets.UTF_8))}), (Object)Row.withSchema((Schema)RowHandler.ATTRIBUTES_ENTRY_SCHEMA).attachValues(new Object[]{"key2", ImmutableList.of((Object)"attr3".getBytes(StandardCharsets.UTF_8))}))).build();
        Assert.assertEquals((Object)expected, (Object)rowHandler.messageToRow(sequencedMessage));
    }
}

