package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.api.client.util.Clock;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubReadSchemaTransformProvider;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.class */
public class PubsubReadSchemaTransformProviderTest {
    private static final Schema SCHEMA = Schema.of(new Schema.Field[]{Schema.Field.of("name", Schema.FieldType.STRING), Schema.Field.of("number", Schema.FieldType.INT64)});
    private static final String SUBSCRIPTION = "projects/project/subscriptions/subscription";
    private static final String TOPIC = "projects/project/topics/topic";
    private static final List<TestCase> cases = Arrays.asList(testCase("no configured topic or subscription", PubsubReadSchemaTransformConfiguration.builder().setDataSchema(SCHEMA).build()).expectInvalidConfiguration(), testCase("both topic and subscription configured", PubsubReadSchemaTransformConfiguration.builder().setSubscription(SUBSCRIPTION).setSubscription(TOPIC).setDataSchema(SCHEMA).build()).expectInvalidConfiguration(), testCase("invalid format configured", PubsubReadSchemaTransformConfiguration.builder().setSubscription(SUBSCRIPTION).setDataSchema(SCHEMA).setFormat("invalidformat").build()).expectInvalidConfiguration(), testCase("configuration with subscription", PubsubReadSchemaTransformConfiguration.builder().setSubscription(SUBSCRIPTION).setDataSchema(SCHEMA).build()).withExpectedPubsubRead(PubsubIO.readMessages().fromSubscription(SUBSCRIPTION)), testCase("configuration with topic", PubsubReadSchemaTransformConfiguration.builder().setTopic(TOPIC).setDataSchema(SCHEMA).build()).withExpectedPubsubRead(PubsubIO.readMessages().fromTopic(TOPIC)), testCase("configuration with subscription, timestamp and id attributes", PubsubReadSchemaTransformConfiguration.builder().setSubscription(SUBSCRIPTION).setTimestampAttribute("timestampAttribute").setIdAttribute("idAttribute").setDataSchema(SCHEMA).build()).withExpectedPubsubRead(PubsubIO.readMessages().fromSubscription(SUBSCRIPTION).withTimestampAttribute("timestampAttribute").withIdAttribute("idAttribute")), testCase("configuration with subscription and dead letter queue", PubsubReadSchemaTransformConfiguration.builder().setSubscription(SUBSCRIPTION).setDataSchema(SCHEMA).setDeadLetterQueue(TOPIC).build()).withExpectedPubsubRead(PubsubIO.readMessages().fromSubscription(SUBSCRIPTION)).withExpectedDeadLetterQueue(PubsubIO.writeMessages().to(TOPIC)), testCase("configuration with subscription, timestamp attribute, and dead letter queue", PubsubReadSchemaTransformConfiguration.builder().setSubscription(SUBSCRIPTION).setTimestampAttribute("timestampAttribute").setDataSchema(SCHEMA).setDeadLetterQueue(TOPIC).build()).withExpectedPubsubRead(PubsubIO.readMessages().fromSubscription(SUBSCRIPTION).withTimestampAttribute("timestampAttribute")).withExpectedDeadLetterQueue(PubsubIO.writeMessages().to(TOPIC).withTimestampAttribute("timestampAttribute")));
    private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
    private static final TypeDescriptor<PubsubReadSchemaTransformConfiguration> TYPE_DESCRIPTOR = TypeDescriptor.of(PubsubReadSchemaTransformConfiguration.class);
    private static final SerializableFunction<PubsubReadSchemaTransformConfiguration, Row> ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
    private static final List<Row> ROWS = Arrays.asList(Row.withSchema(SCHEMA).withFieldValue("name", "a").withFieldValue("number", 100L).build(), Row.withSchema(SCHEMA).withFieldValue("name", "b").withFieldValue("number", 200L).build(), Row.withSchema(SCHEMA).withFieldValue("name", "c").withFieldValue("number", 300L).build());
    private static final Clock CLOCK = (Serializable) () -> {
        return 1656788475425L;
    };
    private static final AvroPayloadSerializerProvider AVRO_PAYLOAD_SERIALIZER_PROVIDER = new AvroPayloadSerializerProvider();
    private static final PayloadSerializer AVRO_PAYLOAD_SERIALIZER = AVRO_PAYLOAD_SERIALIZER_PROVIDER.getSerializer(SCHEMA, new HashMap());

    @Rule
    public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest$TestCase.class */
    public static class TestCase {
        private final String name;
        private final PubsubReadSchemaTransformConfiguration configuration;
        private Map<DisplayData.Identifier, DisplayData.Item> expectedDeadLetterQueue;
        private Map<DisplayData.Identifier, DisplayData.Item> expectedPubsubRead = DisplayData.from(PubsubIO.readMessages()).asMap();
        private boolean invalidConfigurationExpected = false;

        TestCase(String str, PubsubReadSchemaTransformConfiguration pubsubReadSchemaTransformConfiguration) {
            this.name = str;
            this.configuration = pubsubReadSchemaTransformConfiguration;
        }

        SchemaTransform schemaTransform() {
            return new PubsubReadSchemaTransformProvider().from(toBeamRow());
        }

        PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform pubsubReadSchemaTransform() {
            return schemaTransform().buildTransform();
        }

        private Row toBeamRow() {
            return (Row) PubsubReadSchemaTransformProviderTest.ROW_SERIALIZABLE_FUNCTION.apply(this.configuration);
        }

        TestCase withExpectedDeadLetterQueue(PubsubIO.Write<PubsubMessage> write) {
            this.expectedDeadLetterQueue = DisplayData.from(write).asMap();
            return this;
        }

        TestCase withExpectedPubsubRead(PubsubIO.Read<PubsubMessage> read) {
            this.expectedPubsubRead = DisplayData.from(read).asMap();
            return this;
        }

        TestCase expectInvalidConfiguration() {
            this.invalidConfigurationExpected = true;
            return this;
        }
    }

    @Test
    public void testBuildDeadLetterQueueWrite() {
        for (TestCase testCase : cases) {
            PubsubIO.Write buildDeadLetterQueueWrite = testCase.pubsubReadSchemaTransform().buildDeadLetterQueueWrite();
            if (testCase.expectedDeadLetterQueue == null) {
                Assert.assertNull(testCase.name, buildDeadLetterQueueWrite);
                return;
            } else {
                Map asMap = DisplayData.from(buildDeadLetterQueueWrite).asMap();
                Assert.assertEquals(testCase.name, testCase.expectedDeadLetterQueue, asMap);
            }
        }
    }

    @Test
    public void testReadAvro() throws IOException {
        PCollectionRowTuple empty = PCollectionRowTuple.empty(this.p);
        PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform schemaTransformWithClock = schemaTransformWithClock("avro");
        PubsubTestClient.PubsubTestClientFactory clientFactory = clientFactory(incomingAvroMessagesOf(CLOCK.currentTimeMillis()));
        schemaTransformWithClock.setClientFactory(clientFactory);
        PAssert.that(empty.apply(schemaTransformWithClock.buildTransform()).get("OUTPUT")).containsInAnyOrder(ROWS);
        this.p.run().waitUntilFinish();
        clientFactory.close();
    }

    @Test
    public void testReadJson() throws IOException {
        PCollectionRowTuple empty = PCollectionRowTuple.empty(this.p);
        PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform schemaTransformWithClock = schemaTransformWithClock("json");
        PubsubTestClient.PubsubTestClientFactory clientFactory = clientFactory(incomingJsonMessagesOf(CLOCK.currentTimeMillis()));
        schemaTransformWithClock.setClientFactory(clientFactory);
        PAssert.that(empty.apply(schemaTransformWithClock.buildTransform()).get("OUTPUT")).containsInAnyOrder(ROWS);
        this.p.run().waitUntilFinish();
        clientFactory.close();
    }

    @Test
    public void testBuildPubSubRead() {
        for (TestCase testCase : cases) {
            if (!testCase.invalidConfigurationExpected) {
                Map asMap = DisplayData.from(testCase.pubsubReadSchemaTransform().buildPubsubRead()).asMap();
                Assert.assertEquals(testCase.name, testCase.expectedPubsubRead, asMap);
            }
        }
    }

    @Test
    public void testInvalidConfiguration() {
        for (TestCase testCase : cases) {
            PCollectionRowTuple empty = PCollectionRowTuple.empty(this.p);
            if (testCase.invalidConfigurationExpected) {
                Assert.assertThrows(testCase.name, RuntimeException.class, () -> {
                    empty.apply(testCase.pubsubReadSchemaTransform().buildTransform());
                });
            }
        }
    }

    @Test
    public void testInvalidInput() {
        PCollectionRowTuple of = PCollectionRowTuple.of("BadInput", this.p.apply(Create.of(ROWS)));
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            of.apply(new PubsubReadSchemaTransformProvider().from(PubsubReadSchemaTransformConfiguration.builder().setDataSchema(SCHEMA).build()).buildTransform());
        });
    }

    private PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform schemaTransformWithClock(String str) {
        PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform buildTransform = new PubsubReadSchemaTransformProvider().from(PubsubReadSchemaTransformConfiguration.builder().setDataSchema(SCHEMA).setSubscription(SUBSCRIPTION).setFormat(str).build()).buildTransform();
        buildTransform.setClock(CLOCK);
        return buildTransform;
    }

    private static PubsubTestClient.PubsubTestClientFactory clientFactory(List<PubsubClient.IncomingMessage> list) {
        return PubsubTestClient.createFactoryForPull(CLOCK, PubsubClient.subscriptionPathFromPath(SUBSCRIPTION), 60, list);
    }

    private static List<PubsubClient.IncomingMessage> incomingAvroMessagesOf(long j) {
        return (List) ROWS.stream().map(row -> {
            return incomingAvroMessageOf(row, j);
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static PubsubClient.IncomingMessage incomingAvroMessageOf(Row row, long j) {
        return incomingMessageOf(AVRO_PAYLOAD_SERIALIZER.serialize(row), j);
    }

    private static List<PubsubClient.IncomingMessage> incomingJsonMessagesOf(long j) {
        return (List) ROWS.stream().map(row -> {
            return incomingJsonMessageOf(row, j);
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static PubsubClient.IncomingMessage incomingJsonMessageOf(Row row, long j) {
        return incomingJsonMessageOf((String) Objects.requireNonNull(row.getString("name")), ((Long) Objects.requireNonNull(row.getInt64("number"))).longValue(), j);
    }

    private static PubsubClient.IncomingMessage incomingJsonMessageOf(String str, long j, long j2) {
        Gson gson = new Gson();
        JsonObject jsonObject = new JsonObject();
        jsonObject.add("name", new JsonPrimitive(str));
        jsonObject.add("number", new JsonPrimitive(Long.valueOf(j)));
        return incomingMessageOf(gson.toJson(jsonObject).getBytes(StandardCharsets.UTF_8), j2);
    }

    private static PubsubClient.IncomingMessage incomingMessageOf(byte[] bArr, long j) {
        return PubsubClient.IncomingMessage.of(PubsubMessage.newBuilder().setData(ByteString.copyFrom(bArr)).setPublishTime(Timestamp.newBuilder().setNanos(Long.valueOf(j).intValue() * 1000).build()).build(), j, 0L, UUID.randomUUID().toString(), UUID.randomUUID().toString());
    }

    static TestCase testCase(String str, PubsubReadSchemaTransformConfiguration pubsubReadSchemaTransformConfiguration) {
        return new TestCase(str, pubsubReadSchemaTransformConfiguration);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1148924021:
                if (implMethodName.equals("lambda$static$5c7dd581$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/google/api/client/util/Clock") && serializedLambda.getFunctionalInterfaceMethodName().equals("currentTimeMillis") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()J") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return () -> {
                        return 1656788475425L;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
