package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.api.client.util.Clock;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import org.apache.avro.SchemaParseException;
import org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformConfiguration;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformProvider;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.RowJson;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderTest.class */
public class PubsubWriteSchemaTransformProviderTest {
    private static final String ID_ATTRIBUTE = "id_attribute";
    private static final String TOPIC = "projects/project/topics/topic";
    private static final MockClock CLOCK = new MockClock(Instant.now());
    private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
    private static final TypeDescriptor<PubsubWriteSchemaTransformConfiguration> TYPE_DESCRIPTOR = TypeDescriptor.of(PubsubWriteSchemaTransformConfiguration.class);
    private static final SerializableFunction<PubsubWriteSchemaTransformConfiguration, Row> TO_ROW = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
    private static final PipelineOptions OPTIONS = PipelineOptionsFactory.create();

    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderTest$MockClock.class */
    private static class MockClock implements Clock, Serializable {
        private final Long millis;

        private MockClock(Instant instant) {
            this.millis = Long.valueOf(instant.getMillis());
        }

        public long currentTimeMillis() {
            return this.millis.longValue();
        }
    }

    @Test
    public void testBuildPubsubWrite() {
        Assert.assertEquals("default configuration should yield a topic Pub/Sub write", pubsubWrite(), transform(configurationBuilder()).buildPubsubWrite());
        Assert.assertEquals("idAttribute in configuration should yield a idAttribute set Pub/Sub write", pubsubWrite().withIdAttribute(ID_ATTRIBUTE), transform(configurationBuilder().setIdAttribute(ID_ATTRIBUTE)).buildPubsubWrite());
    }

    @Test
    public void testBuildPubsubRowToMessage() {
        Assert.assertEquals("override timestamp attribute on configuration should yield a PubsubRowToMessage with target timestamp", rowToMessageBuilder().setTargetTimestampAttributeName("custom_timestamp_attribute").build(), transform(configurationBuilder().setTarget(PubsubWriteSchemaTransformConfiguration.targetConfigurationBuilder().setTimestampAttributeKey("custom_timestamp_attribute").build())).buildPubsubRowToMessage(PubsubRowToMessageTest.NON_USER_WITH_BYTES_PAYLOAD));
        Assert.assertNull("failing to set format should yield a null payload serializer", transform(configurationBuilder()).buildPubsubRowToMessage(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA).getPayloadSerializer());
        Assert.assertThrows("setting 'json' format for a unsupported field containing Schema should throw an Exception", RowJson.UnsupportedRowJsonException.class, () -> {
            transform(configurationBuilder().setFormat("json")).buildPubsubRowToMessage(Schema.of(new Schema.Field[]{Schema.Field.of("$pubsub_attributes", PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE)}));
        });
        Assert.assertThrows("setting 'avro' format for a unsupported field containing Schema should throw an Exception", SchemaParseException.class, () -> {
            transform(configurationBuilder().setFormat("avro")).buildPubsubRowToMessage(Schema.of(new Schema.Field[]{Schema.Field.of("$pubsub_attributes", PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE)}));
        });
        Assert.assertNotNull("setting 'json' format for valid schema should yield PayloadSerializer", transform(configurationBuilder().setFormat("json")).buildPubsubRowToMessage(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA).getPayloadSerializer());
        Assert.assertNotNull("setting 'avro' format for valid schema should yield PayloadSerializer", transform(configurationBuilder().setFormat("avro")).buildPubsubRowToMessage(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA).getPayloadSerializer());
    }

    @Test
    public void testInvalidTaggedInput() {
        PCollection rowSchema = this.pipeline.apply(Create.of(PubsubRowToMessageTest.rowWithAllDataTypes(true, (byte) 0, Instant.now().toDateTime(), BigDecimal.valueOf(1L), Double.valueOf(3.12345d), Float.valueOf(4.1f), (short) 5, 2, 7L, "asdfjkl;"), new Row[0])).setRowSchema(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA);
        Assert.assertThrows("empty input should not be allowed", IllegalArgumentException.class, () -> {
            transform(configurationBuilder()).expand(PCollectionRowTuple.empty(this.pipeline));
        });
        Assert.assertThrows("input with >1 tagged rows should not be allowed", IllegalArgumentException.class, () -> {
            transform(configurationBuilder()).expand(PCollectionRowTuple.of("input", rowSchema).and("somethingelse", rowSchema));
        });
        Assert.assertThrows("input missing INPUT tag should not be allowed", IllegalArgumentException.class, () -> {
            transform(configurationBuilder()).expand(PCollectionRowTuple.of("somethingelse", rowSchema));
        });
        this.pipeline.run(OPTIONS);
    }

    @Test
    public void testValidateSourceSchemaAgainstConfiguration() {
        transform(configurationBuilder()).validateSourceSchemaAgainstConfiguration(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA);
        transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setAttributesFieldName("attributes").setTimestampFieldName("timestamp").setPayloadFieldName("payload").build())).validateSourceSchemaAgainstConfiguration(Schema.of(new Schema.Field[]{Schema.Field.of("attributes", PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE), Schema.Field.of("timestamp", PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE), Schema.Field.of("payload", Schema.FieldType.BYTES)}));
        transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setAttributesFieldName("attributes").setTimestampFieldName("timestamp").setPayloadFieldName("payload").build())).validateSourceSchemaAgainstConfiguration(Schema.of(new Schema.Field[]{Schema.Field.of("attributes", PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE), Schema.Field.of("timestamp", PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE), Schema.Field.of("payload", Schema.FieldType.row(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA))}));
        Assert.assertThrows("empty Schema should be invalid", IllegalArgumentException.class, () -> {
            transform(configurationBuilder()).validateSourceSchemaAgainstConfiguration(Schema.of(new Schema.Field[0]));
        });
        Assert.assertThrows("attributes field in configuration but not in schema should be invalid", IllegalArgumentException.class, () -> {
            transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setAttributesFieldName("attributes").build())).validateSourceSchemaAgainstConfiguration(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA);
        });
        Assert.assertThrows("timestamp field in configuration but not in schema should be invalid", IllegalArgumentException.class, () -> {
            transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setTimestampFieldName("timestamp").build())).validateSourceSchemaAgainstConfiguration(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA);
        });
        Assert.assertThrows("payload field in configuration but not in schema should be invalid", IllegalArgumentException.class, () -> {
            transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setPayloadFieldName("payload").build())).validateSourceSchemaAgainstConfiguration(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA);
        });
        Assert.assertThrows("attributes field in configuration but mismatching attributes type should be invalid", IllegalArgumentException.class, () -> {
            transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setAttributesFieldName("attributes").build())).validateSourceSchemaAgainstConfiguration(Schema.of(new Schema.Field[]{Schema.Field.of("attributes", Schema.FieldType.map(Schema.FieldType.BYTES, Schema.FieldType.STRING))}));
        });
        Assert.assertThrows("timestamp field in configuration but mismatching timestamp type should be invalid", IllegalArgumentException.class, () -> {
            transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setAttributesFieldName("timestamp").build())).validateSourceSchemaAgainstConfiguration(Schema.of(new Schema.Field[]{Schema.Field.of("timestamp", Schema.FieldType.STRING)}));
        });
        Assert.assertThrows("payload field in configuration but mismatching payload type should be invalid", IllegalArgumentException.class, () -> {
            transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setAttributesFieldName("payload").build())).validateSourceSchemaAgainstConfiguration(Schema.of(new Schema.Field[]{Schema.Field.of("payload", Schema.FieldType.STRING)}));
        });
    }

    @Test
    public void testValidateTargetSchemaAgainstPubsubSchema() throws IOException {
        PubsubClient.TopicPath topicPath = PubsubClient.topicPathFromPath(TOPIC);
        PubsubTestClient.PubsubTestClientFactory createFactoryForGetSchema = PubsubTestClient.createFactoryForGetSchema(topicPath, (PubsubClient.SchemaPath) null, (Schema) null);
        PubsubTestClient.PubsubTestClientFactory createFactoryForGetSchema2 = PubsubTestClient.createFactoryForGetSchema(topicPath, PubsubClient.SchemaPath.DELETED_SCHEMA, (Schema) null);
        PubsubTestClient.PubsubTestClientFactory createFactoryForGetSchema3 = PubsubTestClient.createFactoryForGetSchema(topicPath, PubsubClient.schemaPathFromId("testProject", "misMatch"), Schema.of(new Schema.Field[]{Schema.Field.of("StringField", Schema.FieldType.STRING)}));
        PubsubTestClient.PubsubTestClientFactory createFactoryForGetSchema4 = PubsubTestClient.createFactoryForGetSchema(topicPath, PubsubClient.schemaPathFromId("testProject", "match"), PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA);
        transform(configurationBuilder()).withPubsubClientFactory(createFactoryForGetSchema).validateTargetSchemaAgainstPubsubSchema(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA, OPTIONS);
        createFactoryForGetSchema.close();
        transform(configurationBuilder()).withPubsubClientFactory(createFactoryForGetSchema2).validateTargetSchemaAgainstPubsubSchema(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA, OPTIONS);
        createFactoryForGetSchema2.close();
        Assert.assertThrows("mismatched schema should be detected from Pub/Sub topic", IllegalStateException.class, () -> {
            transform(configurationBuilder()).withPubsubClientFactory(createFactoryForGetSchema3).validateTargetSchemaAgainstPubsubSchema(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA, OPTIONS);
        });
        createFactoryForGetSchema3.close();
        transform(configurationBuilder()).withPubsubClientFactory(createFactoryForGetSchema4).validateTargetSchemaAgainstPubsubSchema(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA, OPTIONS);
        createFactoryForGetSchema4.close();
    }

    @Test
    public void testBuildTargetSchema() {
        Schema.Field of = Schema.Field.of("attributes", PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE);
        Schema.Field of2 = Schema.Field.of("timestamp", PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE);
        Schema.Field of3 = Schema.Field.of("payload", Schema.FieldType.BYTES);
        Schema.Field of4 = Schema.Field.of("payload", Schema.FieldType.row(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA));
        Schema.Field of5 = Schema.Field.of("$pubsub_attributes", PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE);
        Schema.Field of6 = Schema.Field.of("$pubsub_event_timestamp", PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE);
        Schema.Field of7 = Schema.Field.of("$pubsub_payload", Schema.FieldType.BYTES);
        Schema.Field of8 = Schema.Field.of("$pubsub_payload", Schema.FieldType.row(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA));
        Assert.assertEquals("attributes and timestamp field should append to user fields", Schema.builder().addField(of5).addField(of6).addFields(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA.getFields()).build(), transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().build())).buildTargetSchema(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA));
        Assert.assertEquals("timestamp field should append to user fields; attributes field name changed", Schema.builder().addField(of5).addField(of6).addFields(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA.getFields()).build(), transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setAttributesFieldName("attributes").build())).buildTargetSchema(Schema.builder().addField(of).addFields(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA.getFields()).build()));
        Assert.assertEquals("attributes field should append to user fields; timestamp field name changed", Schema.builder().addField(of5).addField(of6).addFields(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA.getFields()).build(), transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setTimestampFieldName("timestamp").build())).buildTargetSchema(Schema.builder().addField(of2).addFields(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA.getFields()).build()));
        Assert.assertEquals("attributes and timestamp field appended to user payload bytes field; payload field name changed", Schema.builder().addField(of5).addField(of6).addField(of7).build(), transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setPayloadFieldName("payload").build())).buildTargetSchema(Schema.builder().addField(of3).build()));
        Assert.assertEquals("attributes and timestamp field appended to user payload row field; payload field name changed", Schema.builder().addField(of5).addField(of6).addField(of8).build(), transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setPayloadFieldName("payload").build())).buildTargetSchema(Schema.builder().addField(of4).build()));
        Assert.assertEquals("attributes and timestamp fields name changed", Schema.builder().addField(of5).addField(of6).addFields(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA.getFields()).build(), transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setAttributesFieldName("attributes").setTimestampFieldName("timestamp").build())).buildTargetSchema(Schema.builder().addField(of).addField(of2).addFields(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA.getFields()).build()));
        Assert.assertEquals("attributes, timestamp, payload bytes fields name changed", Schema.builder().addField(of5).addField(of6).addFields(new Schema.Field[]{of7}).build(), transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setAttributesFieldName("attributes").setTimestampFieldName("timestamp").setPayloadFieldName("payload").build())).buildTargetSchema(Schema.builder().addField(of).addField(of2).addField(of3).build()));
        Assert.assertEquals("attributes, timestamp, payload row fields name changed", Schema.builder().addField(of5).addField(of6).addFields(new Schema.Field[]{of8}).build(), transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setAttributesFieldName("attributes").setTimestampFieldName("timestamp").setPayloadFieldName("payload").build())).buildTargetSchema(Schema.builder().addField(of).addField(of2).addField(of4).build()));
    }

    @Test
    public void testConvertForRowToMessageTransform() {
        Row rowWithAllDataTypes = PubsubRowToMessageTest.rowWithAllDataTypes(false, (byte) 0, Instant.ofEpochMilli(CLOCK.currentTimeMillis()).toDateTime(), BigDecimal.valueOf(1L), Double.valueOf(1.12345d), Float.valueOf(1.1f), (short) 1, 1, 1L, "吃葡萄不吐葡萄皮，不吃葡萄倒吐葡萄皮");
        Schema.Field of = Schema.Field.of("attributes", PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE);
        Schema.Field of2 = Schema.Field.of("$pubsub_attributes", PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE);
        Schema.Field of3 = Schema.Field.of("timestamp", PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE);
        Schema.Field of4 = Schema.Field.of("$pubsub_event_timestamp", PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE);
        Schema.Field of5 = Schema.Field.of("payload", Schema.FieldType.BYTES);
        Schema.Field of6 = Schema.Field.of("$pubsub_payload", Schema.FieldType.BYTES);
        Schema.Field of7 = Schema.Field.of("payload", Schema.FieldType.row(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA));
        Schema.Field of8 = Schema.Field.of("$pubsub_payload", Schema.FieldType.row(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA));
        ImmutableMap of9 = ImmutableMap.of("a", "1");
        Instant ofEpochMilli = Instant.ofEpochMilli(CLOCK.currentTimeMillis());
        Instant ofEpochMilli2 = Instant.ofEpochMilli(CLOCK.currentTimeMillis() + 10000);
        byte[] bytes = "吃葡萄不吐葡萄皮，不吃葡萄倒吐葡萄皮".getBytes(StandardCharsets.UTF_8);
        PAssert.that("attributes only source yields attributes + timestamp target", this.pipeline.apply(Create.of(Row.withSchema(Schema.of(new Schema.Field[]{of})).attachValues(new Object[]{of9}), new Row[0])).setRowSchema(Schema.of(new Schema.Field[]{of})).apply(transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setAttributesFieldName(of.getName()).build())).convertForRowToMessage(Schema.of(new Schema.Field[]{of2, of4}), CLOCK)).setRowSchema(Schema.of(new Schema.Field[]{of2, of4}))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.of(new Schema.Field[]{of2, of4})).attachValues(new Object[]{of9, ofEpochMilli})});
        PAssert.that("timestamp only source yields attributes + timestamp target", this.pipeline.apply(Create.of(Row.withSchema(Schema.of(new Schema.Field[]{of3})).attachValues(new Object[]{ofEpochMilli2}), new Row[0])).setRowSchema(Schema.of(new Schema.Field[]{of3})).apply(transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setTimestampFieldName(of3.getName()).build())).convertForRowToMessage(Schema.of(new Schema.Field[]{of2, of4}), CLOCK)).setRowSchema(Schema.of(new Schema.Field[]{of2, of4}))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.of(new Schema.Field[]{of2, of4})).attachValues(new Object[]{ImmutableMap.of(), ofEpochMilli2})});
        PAssert.that("timestamp and attributes source yields renamed fields in target", this.pipeline.apply(Create.of(Row.withSchema(Schema.of(new Schema.Field[]{of, of3})).attachValues(new Object[]{of9, ofEpochMilli2}), new Row[0])).setRowSchema(Schema.of(new Schema.Field[]{of, of3})).apply(transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setAttributesFieldName(of.getName()).setTimestampFieldName(of3.getName()).build())).convertForRowToMessage(Schema.of(new Schema.Field[]{of2, of4}), CLOCK)).setRowSchema(Schema.of(new Schema.Field[]{of2, of4}))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.of(new Schema.Field[]{of2, of4})).attachValues(new Object[]{of9, ofEpochMilli2})});
        PAssert.that("bytes payload only source yields attributes + timestamp + renamed bytes payload target", this.pipeline.apply(Create.of(Row.withSchema(Schema.of(new Schema.Field[]{of5})).withFieldValue(of5.getName(), bytes).build(), new Row[0])).setRowSchema(Schema.of(new Schema.Field[]{of5})).apply(transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setPayloadFieldName(of5.getName()).build())).convertForRowToMessage(Schema.of(new Schema.Field[]{of2, of4, of6}), CLOCK)).setRowSchema(Schema.of(new Schema.Field[]{of2, of4, of6}))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.of(new Schema.Field[]{of2, of4, of6})).attachValues(new Object[]{ImmutableMap.of(), ofEpochMilli, bytes})});
        PAssert.that("row payload only source yields attributes + timestamp + renamed row payload target", this.pipeline.apply(Create.of(Row.withSchema(Schema.of(new Schema.Field[]{of7})).attachValues(new Object[]{rowWithAllDataTypes}), new Row[0])).setRowSchema(Schema.of(new Schema.Field[]{of7})).apply(transform(configurationBuilder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setPayloadFieldName(of7.getName()).build())).convertForRowToMessage(Schema.of(new Schema.Field[]{of2, of4, of8}), CLOCK)).setRowSchema(Schema.of(new Schema.Field[]{of2, of4, of8}))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.of(new Schema.Field[]{of2, of4, of8})).attachValues(new Object[]{ImmutableMap.of(), ofEpochMilli, rowWithAllDataTypes})});
        PAssert.that("user only fields source yields attributes + timestamp + user fields target", this.pipeline.apply(Create.of(rowWithAllDataTypes, new Row[0])).setRowSchema(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA).apply(transform(configurationBuilder()).convertForRowToMessage(Schema.builder().addField(of2).addField(of4).addFields(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA.getFields()).build(), CLOCK)).setRowSchema(Schema.builder().addField(of2).addField(of4).addFields(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA.getFields()).build())).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addField(of2).addField(of4).addFields(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA.getFields()).build()).addValue(ImmutableMap.of()).addValue(ofEpochMilli).addValues(rowWithAllDataTypes.getValues()).build()});
        this.pipeline.run(OPTIONS);
    }

    @Test
    public void testGetPayloadSerializer() {
        Row rowWithAllDataTypes = PubsubRowToMessageTest.rowWithAllDataTypes(false, (byte) 0, Instant.now().toDateTime(), BigDecimal.valueOf(-1L), Double.valueOf(-3.12345d), Float.valueOf(-4.1f), (short) -5, -2, -7L, "吃葡萄不吐葡萄皮，不吃葡萄倒吐葡萄皮");
        byte[] serialize = new JsonPayloadSerializerProvider().getSerializer(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA, ImmutableMap.of()).serialize(rowWithAllDataTypes);
        byte[] serialize2 = transform(configurationBuilder().setFormat("json")).getPayloadSerializer(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA).serialize(rowWithAllDataTypes);
        byte[] serialize3 = new AvroPayloadSerializerProvider().getSerializer(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA, ImmutableMap.of()).serialize(rowWithAllDataTypes);
        byte[] serialize4 = transform(configurationBuilder().setFormat("avro")).getPayloadSerializer(PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA).serialize(rowWithAllDataTypes);
        Assert.assertArrayEquals("configuration with json format should yield JSON PayloadSerializer", serialize, serialize2);
        Assert.assertArrayEquals("configuration with avro format should yield Avro PayloadSerializer", serialize3, serialize4);
    }

    private static PubsubWriteSchemaTransformConfiguration.Builder configurationBuilder() {
        return PubsubWriteSchemaTransformConfiguration.builder().setTopic(TOPIC).setTarget(PubsubWriteSchemaTransformConfiguration.targetConfigurationBuilder().build());
    }

    private static PubsubRowToMessage.Builder rowToMessageBuilder() {
        return PubsubRowToMessage.builder();
    }

    private static PubsubIO.Write<PubsubMessage> pubsubWrite() {
        return PubsubIO.writeMessages().to(TOPIC);
    }

    private static PubsubWriteSchemaTransformProvider.PubsubWriteSchemaTransform transform(PubsubWriteSchemaTransformConfiguration.Builder builder) {
        return new PubsubWriteSchemaTransformProvider().from((Row) TO_ROW.apply(builder.build()));
    }

    static {
        OPTIONS.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
    }
}
