package org.apache.beam.sdk.extensions.sql.meta.provider.pubsublite;

import com.google.cloud.pubsublite.proto.AttributeValues;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/pubsublite/RowHandlerTest.class */
public class RowHandlerTest {
    private static final Schema FULL_WRITE_SCHEMA = Schema.builder().addByteArrayField("message_key").addField("event_timestamp", Schema.FieldType.DATETIME.withNullable(true)).addArrayField("attributes", Schema.FieldType.row(RowHandler.ATTRIBUTES_ENTRY_SCHEMA)).addByteArrayField("payload").build();
    private static final Schema FULL_READ_SCHEMA = Schema.builder().addByteArrayField("message_key").addField("event_timestamp", Schema.FieldType.DATETIME.withNullable(true)).addArrayField("attributes", Schema.FieldType.row(RowHandler.ATTRIBUTES_ENTRY_SCHEMA)).addByteArrayField("payload").addDateTimeField("publish_timestamp").build();

    @Mock
    public PayloadSerializer serializer;

    @Before
    public void setUp() {
        MockitoAnnotations.openMocks(this);
    }

    @Test
    public void constructionFailures() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            new RowHandler(Schema.builder().addRowField("payload", Schema.builder().addStringField("abc").build()).build());
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            new RowHandler(Schema.builder().addByteArrayField("payload").build(), this.serializer);
        });
    }

    @Test
    public void messageToRowFailures() {
        RowHandler rowHandler = new RowHandler(Schema.builder().addRowField("payload", Schema.builder().addStringField("def").build()).build(), this.serializer);
        ((PayloadSerializer) Mockito.doThrow(new Throwable[]{new IllegalArgumentException("")}).when(this.serializer)).deserialize((byte[]) ArgumentMatchers.any());
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            rowHandler.messageToRow(SequencedMessage.getDefaultInstance());
        });
        RowHandler rowHandler2 = new RowHandler(Schema.builder().addByteArrayField("payload").addDateTimeField("event_timestamp").build());
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            rowHandler2.messageToRow(SequencedMessage.getDefaultInstance());
        });
    }

    @Test
    public void rowToMessageFailures() {
        Schema build = Schema.builder().addStringField("def").build();
        Schema build2 = Schema.builder().addRowField("payload", build).build();
        RowHandler rowHandler = new RowHandler(build2, this.serializer);
        Schema build3 = Schema.builder().addStringField("xxx").build();
        Row attachValues = Row.withSchema(build3).attachValues(new Object[]{Row.withSchema(build3).attachValues(new Object[]{"abc"})});
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            rowHandler.rowToMessage(attachValues);
        });
        Row build4 = Row.withSchema(build2).addValue(Row.withSchema(build).attachValues(new Object[]{"abc"})).build();
        ((PayloadSerializer) Mockito.doThrow(new Throwable[]{new IllegalArgumentException("")}).when(this.serializer)).serialize((Row) ArgumentMatchers.any());
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            rowHandler.rowToMessage(build4);
        });
    }

    @Test
    public void reorderRowToMessage() {
        Assert.assertEquals(PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("abc")).setKey(ByteString.copyFromUtf8("def")).build(), new RowHandler(Schema.builder().addByteArrayField("message_key").addByteArrayField("payload").build()).rowToMessage(Row.withSchema(Schema.builder().addByteArrayField("payload").addByteArrayField("message_key").build()).attachValues(new Object[]{"abc".getBytes(StandardCharsets.UTF_8), "def".getBytes(StandardCharsets.UTF_8)})));
    }

    @Test
    public void fullRowToMessage() {
        RowHandler rowHandler = new RowHandler(FULL_WRITE_SCHEMA);
        Instant now = Instant.now();
        Assert.assertEquals(PubSubMessage.newBuilder().setKey(ByteString.copyFromUtf8("val1")).setData(ByteString.copyFromUtf8("val2")).setEventTime(Timestamps.fromMillis(now.getMillis())).putAttributes("key1", AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8("attr1")).addValues(ByteString.copyFromUtf8("attr2")).build()).putAttributes("key2", AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8("attr3")).build()).build(), rowHandler.rowToMessage(Row.withSchema(FULL_WRITE_SCHEMA).withFieldValue("message_key", "val1".getBytes(StandardCharsets.UTF_8)).withFieldValue("payload", "val2".getBytes(StandardCharsets.UTF_8)).withFieldValue("event_timestamp", now).withFieldValue("attributes", ImmutableList.of(Row.withSchema(RowHandler.ATTRIBUTES_ENTRY_SCHEMA).attachValues(new Object[]{"key1", ImmutableList.of("attr1".getBytes(StandardCharsets.UTF_8), "attr2".getBytes(StandardCharsets.UTF_8))}), Row.withSchema(RowHandler.ATTRIBUTES_ENTRY_SCHEMA).attachValues(new Object[]{"key2", ImmutableList.of("attr3".getBytes(StandardCharsets.UTF_8))}))).build()));
    }

    @Test
    public void fullMessageToRow() {
        RowHandler rowHandler = new RowHandler(FULL_READ_SCHEMA);
        Instant now = Instant.now();
        Instant now2 = Instant.now();
        Assert.assertEquals(Row.withSchema(FULL_READ_SCHEMA).withFieldValue("message_key", "val1".getBytes(StandardCharsets.UTF_8)).withFieldValue("payload", "val2".getBytes(StandardCharsets.UTF_8)).withFieldValue("event_timestamp", now).withFieldValue("publish_timestamp", now2).withFieldValue("attributes", ImmutableList.of(Row.withSchema(RowHandler.ATTRIBUTES_ENTRY_SCHEMA).attachValues(new Object[]{"key1", ImmutableList.of("attr1".getBytes(StandardCharsets.UTF_8), "attr2".getBytes(StandardCharsets.UTF_8))}), Row.withSchema(RowHandler.ATTRIBUTES_ENTRY_SCHEMA).attachValues(new Object[]{"key2", ImmutableList.of("attr3".getBytes(StandardCharsets.UTF_8))}))).build(), rowHandler.messageToRow(SequencedMessage.newBuilder().setMessage(PubSubMessage.newBuilder().setKey(ByteString.copyFromUtf8("val1")).setData(ByteString.copyFromUtf8("val2")).setEventTime(Timestamps.fromMillis(now.getMillis())).putAttributes("key1", AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8("attr1")).addValues(ByteString.copyFromUtf8("attr2")).build()).putAttributes("key2", AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8("attr3")).build()).build()).setPublishTime(Timestamps.fromMillis(now2.getMillis())).build()));
    }
}
