/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubMessageToRow;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class PubsubMessageToRowTest
implements Serializable {
    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();

    @Test
    public void testConvertsMessages() {
        Schema payloadSchema = Schema.builder().addNullableField("id", Schema.FieldType.INT32).addNullableField("name", Schema.FieldType.STRING).build();
        Schema messageSchema = Schema.builder().addDateTimeField("event_timestamp").addMapField("attributes", CalciteUtils.VARCHAR, CalciteUtils.VARCHAR).addRowField("payload", payloadSchema).build();
        PCollection rows = (PCollection)((PCollection)this.pipeline.apply("create", (PTransform)Create.timestamped(this.message(1, this.map("attr", "val"), "{ \"id\" : 3, \"name\" : \"foo\" }"), (TimestampedValue[])new TimestampedValue[]{this.message(2, this.map("bttr", "vbl"), "{ \"name\" : \"baz\", \"id\" : 5 }"), this.message(3, this.map("cttr", "vcl"), "{ \"id\" : 7, \"name\" : \"bar\" }"), this.message(4, this.map("dttr", "vdl"), "{ \"name\" : \"qaz\", \"id\" : 8 }"), this.message(4, this.map("dttr", "vdl"), "{ \"name\" : null, \"id\" : null }")}))).apply("convert", (PTransform)ParDo.of((DoFn)PubsubMessageToRow.builder().messageSchema(messageSchema).useDlq(false).build()));
        PAssert.that((PCollection)rows).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(1L), this.map("attr", "val"), this.row(payloadSchema, 3, "foo")}).build(), Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(2L), this.map("bttr", "vbl"), this.row(payloadSchema, 5, "baz")}).build(), Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(3L), this.map("cttr", "vcl"), this.row(payloadSchema, 7, "bar")}).build(), Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(4L), this.map("dttr", "vdl"), this.row(payloadSchema, 8, "qaz")}).build(), Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(4L), this.map("dttr", "vdl"), this.row(payloadSchema, null, null)}).build()});
        this.pipeline.run();
    }

    @Test
    public void testSendsInvalidToDLQ() {
        Schema payloadSchema = Schema.builder().addInt32Field("id").addStringField("name").build();
        Schema messageSchema = Schema.builder().addDateTimeField("event_timestamp").addMapField("attributes", CalciteUtils.VARCHAR, CalciteUtils.VARCHAR).addRowField("payload", payloadSchema).build();
        PCollectionTuple outputs = (PCollectionTuple)((PCollection)this.pipeline.apply("create", (PTransform)Create.timestamped(this.message(1, this.map("attr1", "val1"), "{ \"invalid1\" : \"sdfsd\" }"), (TimestampedValue[])new TimestampedValue[]{this.message(2, this.map("attr2", "val2"), "{ \"invalid2"), this.message(3, this.map("attr", "val"), "{ \"id\" : 3, \"name\" : \"foo\" }"), this.message(4, this.map("bttr", "vbl"), "{ \"name\" : \"baz\", \"id\" : 5 }")}))).apply("convert", (PTransform)ParDo.of((DoFn)PubsubMessageToRow.builder().messageSchema(messageSchema).useDlq(true).build()).withOutputTags(PubsubMessageToRow.MAIN_TAG, TupleTagList.of((TupleTag)PubsubMessageToRow.DLQ_TAG)));
        PCollection rows = outputs.get(PubsubMessageToRow.MAIN_TAG);
        PCollection dlqMessages = outputs.get(PubsubMessageToRow.DLQ_TAG);
        PAssert.that((PCollection)dlqMessages).satisfies((SerializableFunction & Serializable)messages -> {
            Assert.assertEquals((long)2L, (long)Iterables.size((Iterable)messages));
            Assert.assertEquals((Object)ImmutableSet.of(this.map("attr1", "val1"), this.map("attr2", "val2")), PubsubMessageToRowTest.convertToSet(messages, m -> m.getAttributeMap()));
            Assert.assertEquals((Object)ImmutableSet.of((Object)"{ \"invalid1\" : \"sdfsd\" }", (Object)"{ \"invalid2"), PubsubMessageToRowTest.convertToSet(messages, m -> new String(m.getPayload(), StandardCharsets.UTF_8)));
            return null;
        });
        PAssert.that((PCollection)rows).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(3L), this.map("attr", "val"), this.row(payloadSchema, 3, "foo")}).build(), Row.withSchema((Schema)messageSchema).addValues(new Object[]{this.ts(4L), this.map("bttr", "vbl"), this.row(payloadSchema, 5, "baz")}).build()});
        this.pipeline.run();
    }

    private Row row(Schema schema, Object ... objects) {
        return Row.withSchema((Schema)schema).addValues(objects).build();
    }

    private Map<String, String> map(String attr, String val) {
        return ImmutableMap.of((Object)attr, (Object)val);
    }

    private TimestampedValue<PubsubMessage> message(int timestamp, Map<String, String> attributes, String payload) {
        return TimestampedValue.of((Object)new PubsubMessage(payload.getBytes(StandardCharsets.UTF_8), attributes), (Instant)this.ts(timestamp));
    }

    private Instant ts(long epochMills) {
        return new DateTime(epochMills).toInstant();
    }

    private static <V> Set<V> convertToSet(Iterable<PubsubMessage> messages, Function<? super PubsubMessage, V> mapper) {
        return StreamSupport.stream(messages.spliterator(), false).map(mapper).collect(Collectors.toSet());
    }
}

