package org.apache.beam.sdk.io.gcp.pubsub;

import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIOReadTest;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.AvroPayloadSerializerProvider;
import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Instant;
import org.joda.time.ReadableDateTime;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessageTest.class */
public class PubsubRowToMessageTest {

    @Rule
    public TestPipeline pipeline = TestPipeline.create();

    @Rule
    public TestPipeline errorsTestPipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
    private static final PipelineOptions PIPELINE_OPTIONS = PipelineOptionsFactory.create();
    private static final Schema.Field BOOLEAN_FIELD = Schema.Field.of("boolean", Schema.FieldType.BOOLEAN);
    private static final Schema.Field BYTE_FIELD = Schema.Field.of("byte", Schema.FieldType.BYTE);
    private static final Schema.Field DATETIME_FIELD = Schema.Field.of("datetime", Schema.FieldType.DATETIME);
    private static final Schema.Field DECIMAL_FIELD = Schema.Field.of("decimal", Schema.FieldType.DECIMAL);
    private static final Schema.Field DOUBLE_FIELD = Schema.Field.of("double", Schema.FieldType.DOUBLE);
    private static final Schema.Field FLOAT_FIELD = Schema.Field.of("float", Schema.FieldType.FLOAT);
    private static final Schema.Field INT16_FIELD = Schema.Field.of("int16", Schema.FieldType.INT16);
    private static final Schema.Field INT32_FIELD = Schema.Field.of("int32", Schema.FieldType.INT32);
    private static final Schema.Field INT64_FIELD = Schema.Field.of("int64", Schema.FieldType.INT64);
    private static final Schema.Field STRING_FIELD = Schema.Field.of("string", Schema.FieldType.STRING);
    private static final Schema ALL_DATA_TYPES_SCHEMA = Schema.of(new Schema.Field[]{BOOLEAN_FIELD, BYTE_FIELD, DATETIME_FIELD, DECIMAL_FIELD, DOUBLE_FIELD, FLOAT_FIELD, INT16_FIELD, INT32_FIELD, INT64_FIELD, STRING_FIELD});
    private static final String DEFAULT_ATTRIBUTES_KEY_NAME = "$pubsub_attributes";
    private static final String DEFAULT_EVENT_TIMESTAMP_KEY_NAME = "$pubsub_event_timestamp";
    private static final String DEFAULT_PAYLOAD_KEY_NAME = "$pubsub_payload";
    private static final Schema NON_USER_WITH_BYTES_PAYLOAD = Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE), Schema.Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE), Schema.Field.of(DEFAULT_PAYLOAD_KEY_NAME, Schema.FieldType.BYTES)});
    private static final Schema NON_USER_WITH_ROW_PAYLOAD = Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE), Schema.Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE), Schema.Field.of(DEFAULT_PAYLOAD_KEY_NAME, Schema.FieldType.row(ALL_DATA_TYPES_SCHEMA))});
    private static final Schema NON_USER_WITHOUT_PAYLOAD = Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE), Schema.Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE)});

    @Test(expected = IllegalArgumentException.class)
    public void testExpandThrowsExceptions() {
        PayloadSerializer serializer = new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
        Instant now = Instant.now();
        Instant ofEpochMilli = Instant.ofEpochMilli(now.getMillis() + 1000000);
        Schema of = Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE)});
        Row attachValues = Row.withSchema(of).attachValues(new Object[]{ImmutableMap.of(SpannerIOReadTest.DATABASE_ID, "111")});
        Schema of2 = Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE)});
        Row attachValues2 = Row.withSchema(of2).attachValues(new Object[]{now});
        Schema of3 = Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_PAYLOAD_KEY_NAME, Schema.FieldType.BYTES)});
        Row attachValues3 = Row.withSchema(of3).attachValues(new Object[]{new byte[]{1, 2, 3, 4}});
        Row rowWithAllDataTypes = rowWithAllDataTypes(true, (byte) 0, Instant.now().toDateTime(), BigDecimal.valueOf(1L), Double.valueOf(3.12345d), Float.valueOf(4.1f), (short) 5, 2, 7L, "asdfjkl;");
        Schema of4 = Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_PAYLOAD_KEY_NAME, Schema.FieldType.row(ALL_DATA_TYPES_SCHEMA))});
        Row attachValues4 = Row.withSchema(of4).attachValues(new Object[]{rowWithAllDataTypes});
        PubsubRowToMessage build = PubsubRowToMessage.builder().setMockInstant(ofEpochMilli).build();
        PubsubRowToMessage build2 = PubsubRowToMessage.builder().setPayloadSerializer(serializer).build();
        this.errorsTestPipeline.apply(Create.of(attachValues, new Row[0])).setRowSchema(of).apply(build);
        this.errorsTestPipeline.apply(Create.of(attachValues2, new Row[0])).setRowSchema(of2).apply(build);
        this.errorsTestPipeline.apply(Create.of(attachValues3, new Row[0])).setRowSchema(of3).apply(build2);
        this.errorsTestPipeline.apply(Create.of(attachValues4, new Row[0])).setRowSchema(of4).apply(build);
        this.errorsTestPipeline.apply(Create.of(rowWithAllDataTypes, new Row[0])).setRowSchema(ALL_DATA_TYPES_SCHEMA).apply(build);
        this.errorsTestPipeline.apply(Create.of(merge(attachValues4, rowWithAllDataTypes), new Row[0])).setRowSchema(merge(of4, ALL_DATA_TYPES_SCHEMA)).apply(build2);
    }

    @Test
    public void testExpandError() {
        PayloadSerializer serializer = new JsonPayloadSerializerProvider().getSerializer(Schema.of(new Schema.Field[]{STRING_FIELD}), ImmutableMap.of());
        PAssert.that(this.pipeline.apply(Create.of(rowWithAllDataTypes(true, (byte) 0, Instant.now().toDateTime(), BigDecimal.valueOf(1L), Double.valueOf(3.12345d), Float.valueOf(4.1f), (short) 5, 2, 7L, "asdfjkl;"), new Row[0])).setRowSchema(ALL_DATA_TYPES_SCHEMA).apply(PubsubRowToMessage.builder().setPayloadSerializer(serializer).build()).get(PubsubRowToMessage.ERROR).apply(Count.globally())).containsInAnyOrder(new Long[]{1L});
        this.pipeline.run(PIPELINE_OPTIONS);
    }

    @Test
    public void testExpandOutput() {
        PayloadSerializer serializer = new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
        Instant now = Instant.now();
        Instant ofEpochMilli = Instant.ofEpochMilli(now.getMillis() + 1000000);
        String instant = now.toString();
        String instant2 = ofEpochMilli.toString();
        byte[] bytes = "All the ��'s a ��, and all the �� and �� merely players. They ��️their ��and their ��️and 1️⃣��in his time ▶️many������".getBytes(StandardCharsets.UTF_8);
        Schema.Field of = Schema.Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE);
        Schema.Field of2 = Schema.Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE);
        Schema.Field of3 = Schema.Field.of(DEFAULT_PAYLOAD_KEY_NAME, Schema.FieldType.BYTES);
        Schema.Field of4 = Schema.Field.of(DEFAULT_PAYLOAD_KEY_NAME, Schema.FieldType.row(ALL_DATA_TYPES_SCHEMA));
        Row rowWithAllDataTypes = rowWithAllDataTypes(true, (byte) 0, Instant.now().toDateTime(), BigDecimal.valueOf(1L), Double.valueOf(3.12345d), Float.valueOf(4.1f), (short) 5, 2, 7L, "asdfjkl;");
        byte[] serialize = serializer.serialize(rowWithAllDataTypes);
        Schema of5 = Schema.of(new Schema.Field[]{of});
        Row attachValues = Row.withSchema(of5).attachValues(new Object[]{ImmutableMap.of(SpannerIOReadTest.DATABASE_ID, "111")});
        Schema of6 = Schema.of(new Schema.Field[]{of2});
        Row attachValues2 = Row.withSchema(of6).attachValues(new Object[]{now});
        Schema of7 = Schema.of(new Schema.Field[]{of3});
        Row attachValues3 = Row.withSchema(of7).attachValues(new Object[]{bytes});
        Schema of8 = Schema.of(new Schema.Field[]{of4});
        Row attachValues4 = Row.withSchema(of8).attachValues(new Object[]{rowWithAllDataTypes});
        PubsubRowToMessage build = PubsubRowToMessage.builder().setMockInstant(ofEpochMilli).build();
        PubsubRowToMessage build2 = PubsubRowToMessage.builder().setMockInstant(ofEpochMilli).setPayloadSerializer(serializer).build();
        PAssert.that("expected: payload: user serialized row, attributes: ParDo mocked generated timestamp only", this.pipeline.apply(Create.of(rowWithAllDataTypes, new Row[0])).setRowSchema(ALL_DATA_TYPES_SCHEMA).apply(build2).get(PubsubRowToMessage.OUTPUT)).containsInAnyOrder(new PubsubMessage[]{new PubsubMessage(serialize, ImmutableMap.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, instant2))});
        PAssert.that("expected: payload: user serialized row, attributes: input attributes and ParDo generated timestamp", this.pipeline.apply(Create.of(merge(attachValues, rowWithAllDataTypes), new Row[0])).setRowSchema(merge(of5, ALL_DATA_TYPES_SCHEMA)).apply(build2).get(PubsubRowToMessage.OUTPUT)).containsInAnyOrder(new PubsubMessage[]{new PubsubMessage(serialize, ImmutableMap.of(SpannerIOReadTest.DATABASE_ID, "111", DEFAULT_EVENT_TIMESTAMP_KEY_NAME, instant2))});
        PAssert.that("expected: payload: user serialized row, attributes: timestamp from row input", this.pipeline.apply(Create.of(merge(attachValues2, rowWithAllDataTypes), new Row[0])).setRowSchema(merge(of6, ALL_DATA_TYPES_SCHEMA)).apply(build2).get(PubsubRowToMessage.OUTPUT)).containsInAnyOrder(new PubsubMessage[]{new PubsubMessage(serialize, ImmutableMap.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, instant))});
        PAssert.that("expected: payload: raw bytes, attributes: ParDo generated timestamp", this.pipeline.apply(Create.of(attachValues3, new Row[0])).setRowSchema(of7).apply(build).get(PubsubRowToMessage.OUTPUT)).containsInAnyOrder(new PubsubMessage[]{new PubsubMessage(bytes, ImmutableMap.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, instant2))});
        PAssert.that("expected: payload: non-user row, attributes: ParDo generated timestamp", this.pipeline.apply(Create.of(attachValues4, new Row[0])).setRowSchema(of8).apply(build2).get(PubsubRowToMessage.OUTPUT)).containsInAnyOrder(new PubsubMessage[]{new PubsubMessage(serialize, ImmutableMap.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, instant2))});
        PAssert.that("expected: raw bytes, attributes: embedded attributes and ParDo generated timestamp", this.pipeline.apply(Create.of(merge(attachValues, attachValues3), new Row[0])).setRowSchema(merge(of5, of7)).apply(build).get(PubsubRowToMessage.OUTPUT)).containsInAnyOrder(new PubsubMessage[]{new PubsubMessage(bytes, ImmutableMap.of(SpannerIOReadTest.DATABASE_ID, "111", DEFAULT_EVENT_TIMESTAMP_KEY_NAME, instant2))});
        PAssert.that("expected: payload: raw bytes, attributes: timestamp from row input", this.pipeline.apply(Create.of(merge(attachValues2, attachValues3), new Row[0])).setRowSchema(merge(of6, of7)).apply(build).get(PubsubRowToMessage.OUTPUT)).containsInAnyOrder(new PubsubMessage[]{new PubsubMessage(bytes, ImmutableMap.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, instant))});
        PAssert.that("expected: payload: raw bytes, attributes: from row input including its embedded timestamp", this.pipeline.apply(Create.of(merge(merge(attachValues, attachValues2), attachValues3), new Row[0])).setRowSchema(merge(merge(of5, of6), of7)).apply(build).get(PubsubRowToMessage.OUTPUT)).containsInAnyOrder(new PubsubMessage[]{new PubsubMessage(bytes, ImmutableMap.of(SpannerIOReadTest.DATABASE_ID, "111", DEFAULT_EVENT_TIMESTAMP_KEY_NAME, instant))});
        PAssert.that("expected: payload: serialized row payload input, attributes: from row input including its embedded timestamp", this.pipeline.apply(Create.of(merge(merge(attachValues, attachValues2), attachValues4), new Row[0])).setRowSchema(merge(merge(of5, of6), of8)).apply(build2).get(PubsubRowToMessage.OUTPUT)).containsInAnyOrder(new PubsubMessage[]{new PubsubMessage(serialize, ImmutableMap.of(SpannerIOReadTest.DATABASE_ID, "111", DEFAULT_EVENT_TIMESTAMP_KEY_NAME, instant))});
        PAssert.that("expected: payload: serialized user fields, attributes: from row input including embedded timestamp", this.pipeline.apply(Create.of(merge(merge(attachValues, attachValues2), rowWithAllDataTypes), new Row[0])).setRowSchema(merge(merge(of5, of6), ALL_DATA_TYPES_SCHEMA)).apply(build2).get(PubsubRowToMessage.OUTPUT)).containsInAnyOrder(new PubsubMessage[]{new PubsubMessage(serialize, ImmutableMap.of(SpannerIOReadTest.DATABASE_ID, "111", DEFAULT_EVENT_TIMESTAMP_KEY_NAME, instant))});
        this.pipeline.run(PIPELINE_OPTIONS);
    }

    @Test
    public void testKeyPrefix() {
        PubsubRowToMessage build = PubsubRowToMessage.builder().build();
        Assert.assertEquals(DEFAULT_ATTRIBUTES_KEY_NAME, build.getAttributesKeyName());
        Assert.assertEquals(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, build.getSourceEventTimestampKeyName());
        Assert.assertEquals(DEFAULT_PAYLOAD_KEY_NAME, build.getPayloadKeyName());
        PubsubRowToMessage build2 = PubsubRowToMessage.builder().setKeyPrefix("_").build();
        Assert.assertEquals("_pubsub_attributes", build2.getAttributesKeyName());
        Assert.assertEquals("_pubsub_event_timestamp", build2.getSourceEventTimestampKeyName());
        Assert.assertEquals("_pubsub_payload", build2.getPayloadKeyName());
    }

    @Test
    public void testSetTargetTimestampAttributeName() {
        Instant now = Instant.now();
        String instant = now.toString();
        byte[] bArr = {1, 2, 3, 4};
        Schema.Field of = Schema.Field.of(DEFAULT_PAYLOAD_KEY_NAME, Schema.FieldType.BYTES);
        Row attachValues = Row.withSchema(Schema.of(new Schema.Field[]{of})).attachValues(new Object[]{bArr});
        PubsubRowToMessage build = PubsubRowToMessage.builder().setMockInstant(now).build();
        PubsubRowToMessage build2 = PubsubRowToMessage.builder().setMockInstant(now).setTargetTimestampAttributeName("custom_timestamp_key").build();
        PCollection rowSchema = this.pipeline.apply(Create.of(attachValues, new Row[0])).setRowSchema(Schema.of(new Schema.Field[]{of}));
        PAssert.that(rowSchema.apply(build).get(PubsubRowToMessage.OUTPUT)).containsInAnyOrder(new PubsubMessage[]{new PubsubMessage(bArr, ImmutableMap.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, instant))});
        PAssert.that(rowSchema.apply(build2).get(PubsubRowToMessage.OUTPUT)).containsInAnyOrder(new PubsubMessage[]{new PubsubMessage(bArr, ImmutableMap.of("custom_timestamp_key", instant))});
        this.pipeline.run(PIPELINE_OPTIONS);
    }

    @Test
    public void testInputSchemaFactory() {
        Assert.assertEquals(Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE), Schema.Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE)}), PubsubRowToMessage.builder().build().inputSchemaFactory().buildSchema(new Schema.Field[0]));
        Assert.assertEquals(Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE), Schema.Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE), Schema.Field.of(DEFAULT_PAYLOAD_KEY_NAME, Schema.FieldType.BYTES)}), PubsubRowToMessage.builder().build().inputSchemaFactory(Schema.FieldType.BYTES).buildSchema(new Schema.Field[0]));
        Assert.assertEquals(Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE), Schema.Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE), Schema.Field.of(DEFAULT_PAYLOAD_KEY_NAME, Schema.FieldType.row(ALL_DATA_TYPES_SCHEMA))}), PubsubRowToMessage.builder().build().inputSchemaFactory(Schema.FieldType.row(ALL_DATA_TYPES_SCHEMA)).buildSchema(new Schema.Field[0]));
        Assert.assertEquals(Schema.of(new Schema.Field[]{Schema.Field.of("_pubsub_attributes", PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE), Schema.Field.of("_pubsub_event_timestamp", PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE)}), PubsubRowToMessage.builder().setKeyPrefix("_").build().inputSchemaFactory().buildSchema(new Schema.Field[0]));
        Assert.assertEquals(merge(Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE), Schema.Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE)}), ALL_DATA_TYPES_SCHEMA), PubsubRowToMessage.builder().build().inputSchemaFactory().buildSchema((Schema.Field[]) ALL_DATA_TYPES_SCHEMA.getFields().toArray(new Schema.Field[0])));
    }

    @Test
    public void testValidate() {
        PubsubRowToMessage build = PubsubRowToMessage.builder().build();
        PayloadSerializer serializer = new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            build.validate(Schema.builder().build());
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            build.validate(Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, Schema.FieldType.STRING)}));
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            build.validate(Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.BYTES))}));
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            build.validate(Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, Schema.FieldType.map(Schema.FieldType.BYTES, Schema.FieldType.STRING))}));
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            build.validate(Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, Schema.FieldType.STRING)}));
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            PubsubRowToMessage.builder().build().validate(ALL_DATA_TYPES_SCHEMA);
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            PubsubRowToMessage.builder().build().validate(NON_USER_WITH_ROW_PAYLOAD);
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            PubsubRowToMessage.builder().setPayloadSerializer(serializer).build().validate(NON_USER_WITH_BYTES_PAYLOAD);
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            PubsubRowToMessage.builder().build().validate(merge(ALL_DATA_TYPES_SCHEMA, NON_USER_WITH_BYTES_PAYLOAD));
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            PubsubRowToMessage.builder().build().validate(merge(ALL_DATA_TYPES_SCHEMA, NON_USER_WITH_ROW_PAYLOAD));
        });
    }

    @Test
    public void testValidateAttributesField() {
        PubsubRowToMessage build = PubsubRowToMessage.builder().build();
        build.validateAttributesField(ALL_DATA_TYPES_SCHEMA);
        build.validateAttributesField(Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE)}));
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            build.validateAttributesField(Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, Schema.FieldType.STRING)}));
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            build.validateAttributesField(Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.BYTES))}));
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            build.validateAttributesField(Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, Schema.FieldType.map(Schema.FieldType.BYTES, Schema.FieldType.STRING))}));
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            PubsubRowToMessage.builder().build().validateSerializableFields(ALL_DATA_TYPES_SCHEMA);
        });
    }

    @Test
    public void testValidateSourceEventTimeStampField() {
        PubsubRowToMessage build = PubsubRowToMessage.builder().build();
        build.validateSourceEventTimeStampField(ALL_DATA_TYPES_SCHEMA);
        build.validateSourceEventTimeStampField(Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE)}));
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            build.validateSourceEventTimeStampField(Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, Schema.FieldType.STRING)}));
        });
    }

    @Test
    public void testShouldValidatePayloadSerializerNullState() {
        PayloadSerializer serializer = new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
        PayloadSerializer serializer2 = new AvroPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
        PubsubRowToMessage.builder().setPayloadSerializer(serializer).build().validateSerializableFields(ALL_DATA_TYPES_SCHEMA);
        PubsubRowToMessage.builder().setPayloadSerializer(serializer2).build().validateSerializableFields(ALL_DATA_TYPES_SCHEMA);
        PubsubRowToMessage.builder().setPayloadSerializer(serializer).build().validateSerializableFields(NON_USER_WITH_ROW_PAYLOAD);
        PubsubRowToMessage.builder().setPayloadSerializer(serializer2).build().validateSerializableFields(NON_USER_WITH_ROW_PAYLOAD);
        PubsubRowToMessage.builder().build().validateSerializableFields(NON_USER_WITH_BYTES_PAYLOAD);
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            PubsubRowToMessage.builder().build().validateSerializableFields(ALL_DATA_TYPES_SCHEMA);
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            PubsubRowToMessage.builder().build().validateSerializableFields(NON_USER_WITH_ROW_PAYLOAD);
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            PubsubRowToMessage.builder().setPayloadSerializer(serializer).build().validateSerializableFields(NON_USER_WITH_BYTES_PAYLOAD);
        });
    }

    @Test
    public void testShouldEitherHavePayloadOrUserFields() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            PubsubRowToMessage.builder().build().validateSerializableFields(Schema.of(new Schema.Field[]{Schema.Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE)}));
        });
    }

    @Test
    public void testShouldNotAllowPayloadFieldWithUserFields() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            PubsubRowToMessage.builder().build().validateSerializableFields(merge(ALL_DATA_TYPES_SCHEMA, NON_USER_WITH_BYTES_PAYLOAD));
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            PubsubRowToMessage.builder().build().validateSerializableFields(merge(ALL_DATA_TYPES_SCHEMA, NON_USER_WITH_ROW_PAYLOAD));
        });
    }

    @Test
    public void testSchemaReflection_matchesAll() {
        PubsubRowToMessage.SchemaReflection of = PubsubRowToMessage.SchemaReflection.of(ALL_DATA_TYPES_SCHEMA);
        Assert.assertTrue(of.matchesAll(new PubsubRowToMessage.FieldMatcher[]{PubsubRowToMessage.FieldMatcher.of(BOOLEAN_FIELD.getName()), PubsubRowToMessage.FieldMatcher.of(BYTE_FIELD.getName()), PubsubRowToMessage.FieldMatcher.of(DATETIME_FIELD.getName()), PubsubRowToMessage.FieldMatcher.of(DECIMAL_FIELD.getName()), PubsubRowToMessage.FieldMatcher.of(DOUBLE_FIELD.getName()), PubsubRowToMessage.FieldMatcher.of(FLOAT_FIELD.getName()), PubsubRowToMessage.FieldMatcher.of(INT16_FIELD.getName()), PubsubRowToMessage.FieldMatcher.of(INT32_FIELD.getName()), PubsubRowToMessage.FieldMatcher.of(INT64_FIELD.getName()), PubsubRowToMessage.FieldMatcher.of(STRING_FIELD.getName())}));
        Assert.assertTrue(of.matchesAll(new PubsubRowToMessage.FieldMatcher[]{PubsubRowToMessage.FieldMatcher.of(BOOLEAN_FIELD.getName(), Schema.FieldType.BOOLEAN), PubsubRowToMessage.FieldMatcher.of(BYTE_FIELD.getName(), Schema.FieldType.BYTE), PubsubRowToMessage.FieldMatcher.of(DATETIME_FIELD.getName(), Schema.FieldType.DATETIME), PubsubRowToMessage.FieldMatcher.of(DECIMAL_FIELD.getName(), Schema.FieldType.DECIMAL), PubsubRowToMessage.FieldMatcher.of(DOUBLE_FIELD.getName(), Schema.FieldType.DOUBLE), PubsubRowToMessage.FieldMatcher.of(FLOAT_FIELD.getName(), Schema.FieldType.FLOAT), PubsubRowToMessage.FieldMatcher.of(INT16_FIELD.getName(), Schema.FieldType.INT16), PubsubRowToMessage.FieldMatcher.of(INT32_FIELD.getName(), Schema.FieldType.INT32), PubsubRowToMessage.FieldMatcher.of(INT64_FIELD.getName(), Schema.FieldType.INT64), PubsubRowToMessage.FieldMatcher.of(STRING_FIELD.getName(), Schema.FieldType.STRING)}));
        Assert.assertTrue(of.matchesAll(new PubsubRowToMessage.FieldMatcher[]{PubsubRowToMessage.FieldMatcher.of(BOOLEAN_FIELD.getName(), Schema.TypeName.BOOLEAN), PubsubRowToMessage.FieldMatcher.of(BYTE_FIELD.getName(), Schema.TypeName.BYTE), PubsubRowToMessage.FieldMatcher.of(DATETIME_FIELD.getName(), Schema.TypeName.DATETIME), PubsubRowToMessage.FieldMatcher.of(DECIMAL_FIELD.getName(), Schema.TypeName.DECIMAL), PubsubRowToMessage.FieldMatcher.of(DOUBLE_FIELD.getName(), Schema.TypeName.DOUBLE), PubsubRowToMessage.FieldMatcher.of(FLOAT_FIELD.getName(), Schema.TypeName.FLOAT), PubsubRowToMessage.FieldMatcher.of(INT16_FIELD.getName(), Schema.TypeName.INT16), PubsubRowToMessage.FieldMatcher.of(INT32_FIELD.getName(), Schema.TypeName.INT32), PubsubRowToMessage.FieldMatcher.of(INT64_FIELD.getName(), Schema.TypeName.INT64), PubsubRowToMessage.FieldMatcher.of(STRING_FIELD.getName(), Schema.TypeName.STRING)}));
        Assert.assertFalse(of.matchesAll(new PubsubRowToMessage.FieldMatcher[]{PubsubRowToMessage.FieldMatcher.of("idontexist"), PubsubRowToMessage.FieldMatcher.of(BOOLEAN_FIELD.getName()), PubsubRowToMessage.FieldMatcher.of(BYTE_FIELD.getName()), PubsubRowToMessage.FieldMatcher.of(DATETIME_FIELD.getName()), PubsubRowToMessage.FieldMatcher.of(DECIMAL_FIELD.getName()), PubsubRowToMessage.FieldMatcher.of(DOUBLE_FIELD.getName()), PubsubRowToMessage.FieldMatcher.of(FLOAT_FIELD.getName()), PubsubRowToMessage.FieldMatcher.of(INT16_FIELD.getName()), PubsubRowToMessage.FieldMatcher.of(INT32_FIELD.getName()), PubsubRowToMessage.FieldMatcher.of(INT64_FIELD.getName()), PubsubRowToMessage.FieldMatcher.of(STRING_FIELD.getName())}));
        Assert.assertFalse(of.matchesAll(new PubsubRowToMessage.FieldMatcher[]{PubsubRowToMessage.FieldMatcher.of(BOOLEAN_FIELD.getName(), Schema.FieldType.BOOLEAN), PubsubRowToMessage.FieldMatcher.of(BYTE_FIELD.getName(), Schema.FieldType.BYTE), PubsubRowToMessage.FieldMatcher.of(DATETIME_FIELD.getName(), Schema.FieldType.DATETIME), PubsubRowToMessage.FieldMatcher.of(DECIMAL_FIELD.getName(), Schema.FieldType.DECIMAL), PubsubRowToMessage.FieldMatcher.of(DOUBLE_FIELD.getName(), Schema.FieldType.DOUBLE), PubsubRowToMessage.FieldMatcher.of(FLOAT_FIELD.getName(), Schema.FieldType.FLOAT), PubsubRowToMessage.FieldMatcher.of(INT16_FIELD.getName(), Schema.FieldType.INT16), PubsubRowToMessage.FieldMatcher.of(INT32_FIELD.getName(), Schema.FieldType.INT32), PubsubRowToMessage.FieldMatcher.of(INT64_FIELD.getName(), Schema.FieldType.INT64), PubsubRowToMessage.FieldMatcher.of(STRING_FIELD.getName(), Schema.FieldType.BYTE)}));
        Assert.assertFalse(of.matchesAll(new PubsubRowToMessage.FieldMatcher[]{PubsubRowToMessage.FieldMatcher.of(BOOLEAN_FIELD.getName(), Schema.TypeName.BOOLEAN), PubsubRowToMessage.FieldMatcher.of(BYTE_FIELD.getName(), Schema.TypeName.BYTE), PubsubRowToMessage.FieldMatcher.of(DATETIME_FIELD.getName(), Schema.TypeName.DATETIME), PubsubRowToMessage.FieldMatcher.of(DECIMAL_FIELD.getName(), Schema.TypeName.DECIMAL), PubsubRowToMessage.FieldMatcher.of(DOUBLE_FIELD.getName(), Schema.TypeName.DOUBLE), PubsubRowToMessage.FieldMatcher.of(FLOAT_FIELD.getName(), Schema.TypeName.FLOAT), PubsubRowToMessage.FieldMatcher.of(INT16_FIELD.getName(), Schema.TypeName.INT16), PubsubRowToMessage.FieldMatcher.of(INT32_FIELD.getName(), Schema.TypeName.INT32), PubsubRowToMessage.FieldMatcher.of(INT64_FIELD.getName(), Schema.TypeName.INT64), PubsubRowToMessage.FieldMatcher.of(STRING_FIELD.getName(), Schema.TypeName.INT16)}));
    }

    @Test
    public void testFieldMatcher_match_NameOnly() {
        PubsubRowToMessage.FieldMatcher of = PubsubRowToMessage.FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME);
        Assert.assertTrue(of.match(NON_USER_WITH_ROW_PAYLOAD));
        Assert.assertTrue(of.match(NON_USER_WITH_BYTES_PAYLOAD));
        Assert.assertFalse(of.match(ALL_DATA_TYPES_SCHEMA));
    }

    @Test
    public void testFieldMatcher_match_TypeName() {
        Assert.assertTrue(PubsubRowToMessage.FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME, Schema.TypeName.ROW).match(NON_USER_WITH_ROW_PAYLOAD));
        Assert.assertTrue(PubsubRowToMessage.FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME, Schema.TypeName.BYTES).match(NON_USER_WITH_BYTES_PAYLOAD));
        Assert.assertFalse(PubsubRowToMessage.FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME, Schema.TypeName.ROW).match(NON_USER_WITH_BYTES_PAYLOAD));
        Assert.assertFalse(PubsubRowToMessage.FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME, Schema.TypeName.BYTES).match(NON_USER_WITH_ROW_PAYLOAD));
    }

    @Test
    public void testFieldMatcher_match_FieldType() {
        Assert.assertTrue(PubsubRowToMessage.FieldMatcher.of(DEFAULT_ATTRIBUTES_KEY_NAME, Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.STRING)).match(NON_USER_WITH_BYTES_PAYLOAD));
        Assert.assertFalse(PubsubRowToMessage.FieldMatcher.of(DEFAULT_ATTRIBUTES_KEY_NAME, Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.BYTES)).match(NON_USER_WITH_BYTES_PAYLOAD));
    }

    @Test
    public void testRemoveFields() {
        Assert.assertEquals(Schema.builder().build(), PubsubRowToMessage.removeFields(Schema.of(new Schema.Field[]{STRING_FIELD, BOOLEAN_FIELD, INT64_FIELD}), new String[]{STRING_FIELD.getName(), BOOLEAN_FIELD.getName(), INT64_FIELD.getName()}));
    }

    @Test
    public void testPubsubRowToMessageParDo_attributesWithoutTimestamp() {
        HashMap hashMap = new HashMap();
        hashMap.put("a", "1");
        hashMap.put("b", "2");
        hashMap.put("c", "3");
        Assert.assertEquals(hashMap, doFn(NON_USER_WITH_BYTES_PAYLOAD, null).attributesWithoutTimestamp(Row.withSchema(NON_USER_WITH_BYTES_PAYLOAD).addValues(new Object[]{hashMap, Instant.now(), new byte[0]}).build()));
        Schema removeFields = PubsubRowToMessage.removeFields(NON_USER_WITH_BYTES_PAYLOAD, new String[]{DEFAULT_ATTRIBUTES_KEY_NAME});
        Assert.assertEquals(ImmutableMap.of(), doFn(removeFields, null).attributesWithoutTimestamp(Row.withSchema(removeFields).addValues(new Object[]{Instant.now(), new byte[0]}).build()));
    }

    @Test
    public void testPubsubRowToMessageParDo_timestamp() {
        Instant now = Instant.now();
        Assert.assertEquals(now.toString(), doFn(NON_USER_WITH_BYTES_PAYLOAD, null).timestamp(Row.withSchema(NON_USER_WITH_BYTES_PAYLOAD).addValues(new Object[]{ImmutableMap.of(), now, new byte[0]}).build()).toString());
        Schema removeFields = PubsubRowToMessage.removeFields(NON_USER_WITH_BYTES_PAYLOAD, new String[]{DEFAULT_EVENT_TIMESTAMP_KEY_NAME});
        Assert.assertNotNull(doFn(removeFields, null).timestamp(Row.withSchema(removeFields).addValues(new Object[]{ImmutableMap.of(), new byte[0]}).build()));
    }

    @Test
    public void testPubsubRowToMessageParDo_payload() {
        PayloadSerializer serializer = new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
        PayloadSerializer serializer2 = new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
        byte[] bytes = "abcdefg".getBytes(StandardCharsets.UTF_8);
        Assert.assertArrayEquals(bytes, doFn(NON_USER_WITH_BYTES_PAYLOAD, null).payload(Row.withSchema(NON_USER_WITH_BYTES_PAYLOAD).addValues(new Object[]{ImmutableMap.of(), Instant.now(), bytes}).build()));
        Row build = Row.withSchema(NON_USER_WITH_ROW_PAYLOAD).addValues(new Object[]{ImmutableMap.of(), Instant.now(), rowWithAllDataTypes(true, (byte) 1, Instant.now().toDateTime(), BigDecimal.valueOf(100L), Double.valueOf(1.12345d), Float.valueOf(1.1f), (short) 1, 1, 1L, "abcdefg")}).build();
        Assert.assertArrayEquals(serializer.serialize(build.getRow(DEFAULT_PAYLOAD_KEY_NAME)), doFn(NON_USER_WITH_ROW_PAYLOAD, serializer).payload(build));
        Assert.assertArrayEquals(serializer2.serialize(build.getRow(DEFAULT_PAYLOAD_KEY_NAME)), doFn(NON_USER_WITH_ROW_PAYLOAD, serializer2).payload(build));
        HashMap hashMap = new HashMap();
        hashMap.put("a", "1");
        hashMap.put("b", "2");
        hashMap.put("c", "3");
        Row build2 = Row.withSchema(NON_USER_WITHOUT_PAYLOAD).addValues(new Object[]{hashMap, Instant.now()}).build();
        Row rowWithAllDataTypes = rowWithAllDataTypes(false, (byte) 0, Instant.now().toDateTime(), BigDecimal.valueOf(200L), Double.valueOf(2.12345d), Float.valueOf(2.1f), (short) 2, 2, 2L, "1234567");
        Row merge = merge(build2, rowWithAllDataTypes);
        Assert.assertArrayEquals(serializer.serialize(rowWithAllDataTypes), doFn(merge.getSchema(), serializer).payload(merge));
        Assert.assertArrayEquals(serializer2.serialize(rowWithAllDataTypes), doFn(merge.getSchema(), serializer2).payload(merge));
    }

    @Test
    public void testPubsubRowToMessageParDo_serializableRow() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            doFn(NON_USER_WITH_BYTES_PAYLOAD, null).serializableRow(Row.withSchema(NON_USER_WITH_BYTES_PAYLOAD).attachValues(new Object[]{ImmutableMap.of(), Instant.now(), new byte[0]}));
        });
        HashMap hashMap = new HashMap();
        hashMap.put("a", "1");
        hashMap.put("b", "2");
        hashMap.put("c", "3");
        Row rowWithAllDataTypes = rowWithAllDataTypes(true, (byte) 0, Instant.now().toDateTime(), BigDecimal.valueOf(1L), Double.valueOf(3.12345d), Float.valueOf(4.1f), (short) 5, 2, 7L, "asdfjkl;");
        Row attachValues = Row.withSchema(NON_USER_WITH_ROW_PAYLOAD).attachValues(new Object[]{hashMap, Instant.now(), rowWithAllDataTypes});
        PayloadSerializer serializer = new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
        Assert.assertEquals(rowWithAllDataTypes, doFn(NON_USER_WITH_ROW_PAYLOAD, serializer).serializableRow(attachValues));
        Row merge = merge(Row.withSchema(NON_USER_WITHOUT_PAYLOAD).addValues(new Object[]{hashMap, Instant.now()}).build(), rowWithAllDataTypes);
        Assert.assertEquals(rowWithAllDataTypes, doFn(merge.getSchema(), serializer).serializableRow(merge));
    }

    private static PubsubRowToMessage.PubsubRowToMessageDoFn doFn(Schema schema, PayloadSerializer payloadSerializer) {
        return new PubsubRowToMessage.PubsubRowToMessageDoFn(DEFAULT_ATTRIBUTES_KEY_NAME, DEFAULT_EVENT_TIMESTAMP_KEY_NAME, DEFAULT_PAYLOAD_KEY_NAME, PubsubRowToMessage.errorSchema(schema), DEFAULT_EVENT_TIMESTAMP_KEY_NAME, (Instant) null, payloadSerializer);
    }

    private static Row rowWithAllDataTypes(boolean z, byte b, ReadableDateTime readableDateTime, BigDecimal bigDecimal, Double d, Float f, Short sh, Integer num, Long l, String str) {
        return Row.withSchema(ALL_DATA_TYPES_SCHEMA).addValues(new Object[]{Boolean.valueOf(z), Byte.valueOf(b), readableDateTime, bigDecimal, d, f, sh, num, l, str}).build();
    }

    private static Schema merge(Schema schema, Schema schema2) {
        ArrayList arrayList = new ArrayList(schema.getFields());
        arrayList.addAll(schema2.getFields());
        Schema.Builder builder = Schema.builder();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            builder = builder.addField((Schema.Field) it.next());
        }
        return builder.build();
    }

    private static Row merge(Row row, Row row2) {
        Schema merge = merge(row.getSchema(), row2.getSchema());
        HashMap hashMap = new HashMap();
        for (String str : row.getSchema().getFieldNames()) {
            hashMap.put(str, row.getValue(str));
        }
        for (String str2 : row2.getSchema().getFieldNames()) {
            hashMap.put(str2, row2.getValue(str2));
        }
        return Row.withSchema(merge).withFieldValues(hashMap).build();
    }

    static {
        PIPELINE_OPTIONS.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
    }
}
