/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsub;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubSchemaIOProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class PubsubMessageToRowTest
implements Serializable {
    @Rule
    public transient @UnknownKeyFor @NonNull @Initialized TestPipeline pipeline = TestPipeline.create();

    @Test
    public void testConvertsMessages() {
        Schema payloadSchema = Schema.builder().addNullableField("id", Schema.FieldType.INT32).addNullableField("name", Schema.FieldType.STRING).build();
        Schema messageSchema = Schema.builder().addDateTimeField("event_timestamp").addMapField("attributes", PubsubSchemaIOProvider.VARCHAR, PubsubSchemaIOProvider.VARCHAR).addRowField("payload", payloadSchema).build();
        PCollectionTuple rows = (PCollectionTuple)((PCollection)this.pipeline.apply("create", (PTransform)Create.timestamped(this.message(1, this.map("attr", "val"), "{ \"id\" : 3, \"name\" : \"foo\" }"), (TimestampedValue[])new TimestampedValue[]{this.message(2, this.map("bttr", "vbl"), "{ \"name\" : \"baz\", \"id\" : 5 }"), this.message(3, this.map("cttr", "vcl"), "{ \"id\" : 7, \"name\" : \"bar\" }"), this.message(4, this.map("dttr", "vdl"), "{ \"name\" : \"qaz\", \"id\" : 8 }"), this.message(4, this.map("dttr", "vdl"), "{ \"name\" : null, \"id\" : null }")}))).apply("convert", (PTransform)PubsubMessageToRow.builder().messageSchema(messageSchema).useDlq(false).useFlatSchema(false).build());
        PAssert.that((PCollection)rows.get(PubsubMessageToRow.MAIN_TAG)).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(1L), this.map("attr", "val"), this.row(payloadSchema, 3, "foo")}).build(), Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(2L), this.map("bttr", "vbl"), this.row(payloadSchema, 5, "baz")}).build(), Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(3L), this.map("cttr", "vcl"), this.row(payloadSchema, 7, "bar")}).build(), Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(4L), this.map("dttr", "vdl"), this.row(payloadSchema, 8, "qaz")}).build(), Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(4L), this.map("dttr", "vdl"), this.row(payloadSchema, null, null)}).build()});
        this.pipeline.run();
    }

    @Test
    public void testSendsInvalidToDLQ() {
        Schema payloadSchema = Schema.builder().addInt32Field("id").addStringField("name").build();
        Schema messageSchema = Schema.builder().addDateTimeField("event_timestamp").addMapField("attributes", PubsubSchemaIOProvider.VARCHAR, PubsubSchemaIOProvider.VARCHAR).addRowField("payload", payloadSchema).build();
        PCollectionTuple outputs = (PCollectionTuple)((PCollection)this.pipeline.apply("create", (PTransform)Create.timestamped(this.message(1, this.map("attr1", "val1"), "{ \"invalid1\" : \"sdfsd\" }"), (TimestampedValue[])new TimestampedValue[]{this.message(2, this.map("attr2", "val2"), "{ \"invalid2"), this.message(3, this.map("attr", "val"), "{ \"id\" : 3, \"name\" : \"foo\" }"), this.message(4, this.map("bttr", "vbl"), "{ \"name\" : \"baz\", \"id\" : 5 }")}))).apply("convert", (PTransform)PubsubMessageToRow.builder().messageSchema(messageSchema).useDlq(true).useFlatSchema(false).build());
        PCollection rows = outputs.get(PubsubMessageToRow.MAIN_TAG);
        PCollection dlqMessages = outputs.get(PubsubMessageToRow.DLQ_TAG);
        PAssert.that((PCollection)dlqMessages).satisfies((SerializableFunction & Serializable)messages -> {
            Assert.assertEquals((long)2L, (long)Iterables.size((Iterable)messages));
            Assert.assertEquals((Object)ImmutableSet.of(this.map("attr1", "val1"), this.map("attr2", "val2")), PubsubMessageToRowTest.convertToSet(messages, PubsubMessage::getAttributeMap));
            Assert.assertEquals((Object)ImmutableSet.of((Object)"{ \"invalid1\" : \"sdfsd\" }", (Object)"{ \"invalid2"), PubsubMessageToRowTest.convertToSet(messages, m -> new String(m.getPayload(), StandardCharsets.UTF_8)));
            return null;
        });
        PAssert.that((PCollection)rows).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(3L), this.map("attr", "val"), this.row(payloadSchema, 3, "foo")}).build(), Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(4L), this.map("bttr", "vbl"), this.row(payloadSchema, 5, "baz")}).build()});
        this.pipeline.run();
    }

    @Test
    public void testConvertsMessagesToFlatRow() {
        Schema messageSchema = Schema.builder().addDateTimeField("event_timestamp").addNullableField("id", Schema.FieldType.INT32).addNullableField("name", Schema.FieldType.STRING).build();
        PCollectionTuple rows = (PCollectionTuple)((PCollection)this.pipeline.apply("create", (PTransform)Create.timestamped(this.message(1, this.map("attr", "val"), "{ \"id\" : 3, \"name\" : \"foo\" }"), (TimestampedValue[])new TimestampedValue[]{this.message(2, this.map("bttr", "vbl"), "{ \"name\" : \"baz\", \"id\" : 5 }"), this.message(3, this.map("cttr", "vcl"), "{ \"id\" : 7, \"name\" : \"bar\" }"), this.message(4, this.map("dttr", "vdl"), "{ \"name\" : \"qaz\", \"id\" : 8 }"), this.message(4, this.map("dttr", "vdl"), "{ \"name\" : null, \"id\" : null }")}))).apply("convert", (PTransform)PubsubMessageToRow.builder().messageSchema(messageSchema).useDlq(false).useFlatSchema(true).build());
        PAssert.that((PCollection)rows.get(PubsubMessageToRow.MAIN_TAG)).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(1L), 3, "foo"}).build(), Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(2L), 5, "baz"}).build(), Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(3L), 7, "bar"}).build(), Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(4L), 8, "qaz"}).build(), Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(4L), null, null}).build()});
        this.pipeline.run();
    }

    @Test
    public void testSendsFlatRowInvalidToDLQ() {
        Schema messageSchema = Schema.builder().addDateTimeField("event_timestamp").addInt32Field("id").addStringField("name").build();
        PCollectionTuple outputs = (PCollectionTuple)((PCollection)this.pipeline.apply("create", (PTransform)Create.timestamped(this.message(1, this.map("attr1", "val1"), "{ \"invalid1\" : \"sdfsd\" }"), (TimestampedValue[])new TimestampedValue[]{this.message(2, this.map("attr2", "val2"), "{ \"invalid2"), this.message(3, this.map("attr", "val"), "{ \"id\" : 3, \"name\" : \"foo\" }"), this.message(4, this.map("bttr", "vbl"), "{ \"name\" : \"baz\", \"id\" : 5 }")}))).apply("convert", (PTransform)PubsubMessageToRow.builder().messageSchema(messageSchema).useDlq(true).useFlatSchema(true).build());
        PCollection rows = outputs.get(PubsubMessageToRow.MAIN_TAG);
        PCollection dlqMessages = outputs.get(PubsubMessageToRow.DLQ_TAG);
        PAssert.that((PCollection)dlqMessages).satisfies((SerializableFunction & Serializable)messages -> {
            Assert.assertEquals((long)2L, (long)Iterables.size((Iterable)messages));
            Assert.assertEquals((Object)ImmutableSet.of(this.map("attr1", "val1"), this.map("attr2", "val2")), PubsubMessageToRowTest.convertToSet(messages, PubsubMessage::getAttributeMap));
            Assert.assertEquals((Object)ImmutableSet.of((Object)"{ \"invalid1\" : \"sdfsd\" }", (Object)"{ \"invalid2"), PubsubMessageToRowTest.convertToSet(messages, m -> new String(m.getPayload(), StandardCharsets.UTF_8)));
            return null;
        });
        PAssert.that((PCollection)rows).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(3L), 3, "foo"}).build(), Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(4L), 5, "baz"}).build()});
        this.pipeline.run();
    }

    @Test
    public void testFlatSchemaMessageInvalidElement() {
        Schema messageSchema = Schema.builder().addDateTimeField("event_timestamp").addInt32Field("id").addStringField("name").build();
        ((PCollection)this.pipeline.apply("create", (PTransform)Create.timestamped(this.message(1, this.map("attr", "val"), "{ \"id\" : 3, \"name\" : \"foo\" }"), (TimestampedValue[])new TimestampedValue[]{this.message(2, this.map("attr1", "val1"), "{ \"invalid1\" : \"sdfsd\" }")}))).apply("convert", (PTransform)PubsubMessageToRow.builder().messageSchema(messageSchema).useDlq(false).useFlatSchema(true).build());
        Exception exception = (Exception)Assert.assertThrows(RuntimeException.class, () -> this.pipeline.run());
        Assert.assertTrue((boolean)exception.getMessage().contains("Error parsing message"));
    }

    @Test
    public void testNestedSchemaMessageInvalidElement() {
        Schema payloadSchema = Schema.builder().addInt32Field("id").addStringField("name").build();
        Schema messageSchema = Schema.builder().addDateTimeField("event_timestamp").addMapField("attributes", PubsubSchemaIOProvider.VARCHAR, PubsubSchemaIOProvider.VARCHAR).addRowField("payload", payloadSchema).build();
        ((PCollection)this.pipeline.apply("create", (PTransform)Create.timestamped(this.message(1, this.map("attr", "val"), "{ \"id\" : 3, \"name\" : \"foo\" }"), (TimestampedValue[])new TimestampedValue[]{this.message(2, this.map("attr1", "val1"), "{ \"invalid1\" : \"sdfsd\" }")}))).apply("convert", (PTransform)PubsubMessageToRow.builder().messageSchema(messageSchema).useDlq(false).useFlatSchema(false).build());
        Exception exception = (Exception)Assert.assertThrows(RuntimeException.class, () -> this.pipeline.run());
        Assert.assertTrue((boolean)exception.getMessage().contains("Error parsing message"));
    }

    @Test
    public void testParsesAvroPayload() {
        Schema payloadSchema = this.getParserSchema();
        Row row = this.row(payloadSchema, 3, "Dovahkiin", 5.5, 5L);
        byte[] payload = (byte[])AvroUtils.getRowToAvroBytesFunction((Schema)payloadSchema).apply((Object)row);
        SimpleFunction messageToRowFn = PubsubMessageToRow.getParsePayloadFn((PubsubSchemaIOProvider.PayloadFormat)PubsubSchemaIOProvider.PayloadFormat.AVRO, (Schema)payloadSchema);
        Row parsedRow = (Row)messageToRowFn.apply((Object)new PubsubMessage(payload, null));
        Assert.assertEquals((Object)row, (Object)parsedRow);
    }

    private @UnknownKeyFor @NonNull @Initialized Schema getParserSchema() {
        return Schema.builder().addInt32Field("id").addNullableField("name", Schema.FieldType.STRING).addNullableField("real", Schema.FieldType.DOUBLE).addNullableField("number", Schema.FieldType.INT64).build();
    }

    private @UnknownKeyFor @NonNull @Initialized Row row(@UnknownKeyFor @NonNull @Initialized Schema schema, Object ... objects) {
        return Row.withSchema((Schema)schema).addValues(objects).build();
    }

    private @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> map(@UnknownKeyFor @NonNull @Initialized String attr, @UnknownKeyFor @NonNull @Initialized String val) {
        return ImmutableMap.of((Object)attr, (Object)val);
    }

    private @UnknownKeyFor @NonNull @Initialized TimestampedValue<@UnknownKeyFor @NonNull @Initialized PubsubMessage> message(@UnknownKeyFor @NonNull @Initialized int timestamp, @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> attributes, @UnknownKeyFor @NonNull @Initialized String payload) {
        return TimestampedValue.of((Object)new PubsubMessage(payload.getBytes(StandardCharsets.UTF_8), attributes), (Instant)this.ts(timestamp));
    }

    private @UnknownKeyFor @NonNull @Initialized Instant ts(@UnknownKeyFor @NonNull @Initialized long epochMills) {
        return new DateTime(epochMills).toInstant();
    }

    private static <V> @UnknownKeyFor @NonNull @Initialized Set<V> convertToSet(@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized PubsubMessage> messages, @UnknownKeyFor @NonNull @Initialized Function<@UnknownKeyFor @Nullable @Initialized ? super @UnknownKeyFor @NonNull @Initialized PubsubMessage, V> mapper) {
        return StreamSupport.stream(messages.spliterator(), false).map(mapper).collect(Collectors.toSet());
    }
}

