package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.Iterables;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRowTest.class */
public class PubsubMessageToRowTest implements Serializable {

    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();

    @Test
    public void testConvertsMessages() {
        Schema build = Schema.builder().addNullableField("id", Schema.FieldType.INT32).addNullableField("name", Schema.FieldType.STRING).build();
        Schema build2 = Schema.builder().addDateTimeField("event_timestamp").addMapField("attributes", CalciteUtils.VARCHAR, CalciteUtils.VARCHAR).addRowField("payload", build).build();
        PAssert.that(this.pipeline.apply("create", Create.timestamped(message(1, map("attr", "val"), "{ \"id\" : 3, \"name\" : \"foo\" }"), new TimestampedValue[]{message(2, map("bttr", "vbl"), "{ \"name\" : \"baz\", \"id\" : 5 }"), message(3, map("cttr", "vcl"), "{ \"id\" : 7, \"name\" : \"bar\" }"), message(4, map("dttr", "vdl"), "{ \"name\" : \"qaz\", \"id\" : 8 }"), message(4, map("dttr", "vdl"), "{ \"name\" : null, \"id\" : null }")})).apply("convert", ParDo.of(PubsubMessageToRow.builder().messageSchema(build2).useDlq(false).useFlatSchema(false).build()))).containsInAnyOrder(new Row[]{Row.withSchema(build2).addValues(new Object[]{ts(1L), map("attr", "val"), row(build, 3, "foo")}).build(), Row.withSchema(build2).addValues(new Object[]{ts(2L), map("bttr", "vbl"), row(build, 5, "baz")}).build(), Row.withSchema(build2).addValues(new Object[]{ts(3L), map("cttr", "vcl"), row(build, 7, "bar")}).build(), Row.withSchema(build2).addValues(new Object[]{ts(4L), map("dttr", "vdl"), row(build, 8, "qaz")}).build(), Row.withSchema(build2).addValues(new Object[]{ts(4L), map("dttr", "vdl"), row(build, null, null)}).build()});
        this.pipeline.run();
    }

    @Test
    public void testSendsInvalidToDLQ() {
        Schema build = Schema.builder().addInt32Field("id").addStringField("name").build();
        Schema build2 = Schema.builder().addDateTimeField("event_timestamp").addMapField("attributes", CalciteUtils.VARCHAR, CalciteUtils.VARCHAR).addRowField("payload", build).build();
        PCollectionTuple apply = this.pipeline.apply("create", Create.timestamped(message(1, map("attr1", "val1"), "{ \"invalid1\" : \"sdfsd\" }"), new TimestampedValue[]{message(2, map("attr2", "val2"), "{ \"invalid2"), message(3, map("attr", "val"), "{ \"id\" : 3, \"name\" : \"foo\" }"), message(4, map("bttr", "vbl"), "{ \"name\" : \"baz\", \"id\" : 5 }")})).apply("convert", ParDo.of(PubsubMessageToRow.builder().messageSchema(build2).useDlq(true).useFlatSchema(false).build()).withOutputTags(PubsubMessageToRow.MAIN_TAG, TupleTagList.of(PubsubMessageToRow.DLQ_TAG)));
        PCollection pCollection = apply.get(PubsubMessageToRow.MAIN_TAG);
        PAssert.that(apply.get(PubsubMessageToRow.DLQ_TAG)).satisfies(iterable -> {
            Assert.assertEquals(2L, Iterables.size(iterable));
            Assert.assertEquals(ImmutableSet.of(map("attr1", "val1"), map("attr2", "val2")), convertToSet(iterable, pubsubMessage -> {
                return pubsubMessage.getAttributeMap();
            }));
            Assert.assertEquals(ImmutableSet.of("{ \"invalid1\" : \"sdfsd\" }", "{ \"invalid2"), convertToSet(iterable, pubsubMessage2 -> {
                return new String(pubsubMessage2.getPayload(), StandardCharsets.UTF_8);
            }));
            return null;
        });
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build2).addValues(new Object[]{ts(3L), map("attr", "val"), row(build, 3, "foo")}).build(), Row.withSchema(build2).addValues(new Object[]{ts(4L), map("bttr", "vbl"), row(build, 5, "baz")}).build()});
        this.pipeline.run();
    }

    @Test
    public void testConvertsMessagesToFlatRow() {
        Schema build = Schema.builder().addDateTimeField("event_timestamp").addNullableField("id", Schema.FieldType.INT32).addNullableField("name", Schema.FieldType.STRING).build();
        PAssert.that(this.pipeline.apply("create", Create.timestamped(message(1, map("attr", "val"), "{ \"id\" : 3, \"name\" : \"foo\" }"), new TimestampedValue[]{message(2, map("bttr", "vbl"), "{ \"name\" : \"baz\", \"id\" : 5 }"), message(3, map("cttr", "vcl"), "{ \"id\" : 7, \"name\" : \"bar\" }"), message(4, map("dttr", "vdl"), "{ \"name\" : \"qaz\", \"id\" : 8 }"), message(4, map("dttr", "vdl"), "{ \"name\" : null, \"id\" : null }")})).apply("convert", ParDo.of(PubsubMessageToRow.builder().messageSchema(build).useDlq(false).useFlatSchema(true).build()))).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{ts(1L), 3, "foo"}).build(), Row.withSchema(build).addValues(new Object[]{ts(2L), 5, "baz"}).build(), Row.withSchema(build).addValues(new Object[]{ts(3L), 7, "bar"}).build(), Row.withSchema(build).addValues(new Object[]{ts(4L), 8, "qaz"}).build(), Row.withSchema(build).addValues(new Object[]{ts(4L), null, null}).build()});
        this.pipeline.run();
    }

    @Test
    public void testSendsFlatRowInvalidToDLQ() {
        Schema build = Schema.builder().addDateTimeField("event_timestamp").addInt32Field("id").addStringField("name").build();
        PCollectionTuple apply = this.pipeline.apply("create", Create.timestamped(message(1, map("attr1", "val1"), "{ \"invalid1\" : \"sdfsd\" }"), new TimestampedValue[]{message(2, map("attr2", "val2"), "{ \"invalid2"), message(3, map("attr", "val"), "{ \"id\" : 3, \"name\" : \"foo\" }"), message(4, map("bttr", "vbl"), "{ \"name\" : \"baz\", \"id\" : 5 }")})).apply("convert", ParDo.of(PubsubMessageToRow.builder().messageSchema(build).useDlq(true).useFlatSchema(true).build()).withOutputTags(PubsubMessageToRow.MAIN_TAG, TupleTagList.of(PubsubMessageToRow.DLQ_TAG)));
        PCollection pCollection = apply.get(PubsubMessageToRow.MAIN_TAG);
        PAssert.that(apply.get(PubsubMessageToRow.DLQ_TAG)).satisfies(iterable -> {
            Assert.assertEquals(2L, Iterables.size(iterable));
            Assert.assertEquals(ImmutableSet.of(map("attr1", "val1"), map("attr2", "val2")), convertToSet(iterable, pubsubMessage -> {
                return pubsubMessage.getAttributeMap();
            }));
            Assert.assertEquals(ImmutableSet.of("{ \"invalid1\" : \"sdfsd\" }", "{ \"invalid2"), convertToSet(iterable, pubsubMessage2 -> {
                return new String(pubsubMessage2.getPayload(), StandardCharsets.UTF_8);
            }));
            return null;
        });
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{ts(3L), 3, "foo"}).build(), Row.withSchema(build).addValues(new Object[]{ts(4L), 5, "baz"}).build()});
        this.pipeline.run();
    }

    private Row row(Schema schema, Object... objArr) {
        return Row.withSchema(schema).addValues(objArr).build();
    }

    private Map<String, String> map(String str, String str2) {
        return ImmutableMap.of(str, str2);
    }

    private TimestampedValue<PubsubMessage> message(int i, Map<String, String> map, String str) {
        return TimestampedValue.of(new PubsubMessage(str.getBytes(StandardCharsets.UTF_8), map), ts(i));
    }

    private Instant ts(long j) {
        return new DateTime(j).toInstant();
    }

    private static <V> Set<V> convertToSet(Iterable<PubsubMessage> iterable, Function<? super PubsubMessage, V> function) {
        return (Set) StreamSupport.stream(iterable.spliterator(), false).map(function).collect(Collectors.toSet());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1766791980:
                if (implMethodName.equals("lambda$testSendsFlatRowInvalidToDLQ$43268ee4$1")) {
                    z = true;
                    break;
                }
                break;
            case 2024342333:
                if (implMethodName.equals("lambda$testSendsInvalidToDLQ$43268ee4$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRowTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    PubsubMessageToRowTest pubsubMessageToRowTest = (PubsubMessageToRowTest) serializedLambda.getCapturedArg(0);
                    return iterable -> {
                        Assert.assertEquals(2L, Iterables.size(iterable));
                        Assert.assertEquals(ImmutableSet.of(map("attr1", "val1"), map("attr2", "val2")), convertToSet(iterable, pubsubMessage -> {
                            return pubsubMessage.getAttributeMap();
                        }));
                        Assert.assertEquals(ImmutableSet.of("{ \"invalid1\" : \"sdfsd\" }", "{ \"invalid2"), convertToSet(iterable, pubsubMessage2 -> {
                            return new String(pubsubMessage2.getPayload(), StandardCharsets.UTF_8);
                        }));
                        return null;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRowTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    PubsubMessageToRowTest pubsubMessageToRowTest2 = (PubsubMessageToRowTest) serializedLambda.getCapturedArg(0);
                    return iterable2 -> {
                        Assert.assertEquals(2L, Iterables.size(iterable2));
                        Assert.assertEquals(ImmutableSet.of(map("attr1", "val1"), map("attr2", "val2")), convertToSet(iterable2, pubsubMessage -> {
                            return pubsubMessage.getAttributeMap();
                        }));
                        Assert.assertEquals(ImmutableSet.of("{ \"invalid1\" : \"sdfsd\" }", "{ \"invalid2"), convertToSet(iterable2, pubsubMessage2 -> {
                            return new String(pubsubMessage2.getPayload(), StandardCharsets.UTF_8);
                        }));
                        return null;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
