package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.GetSchemaRequest;
import com.google.pubsub.v1.GetTopicRequest;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PublisherGrpc;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaServiceGrpc;
import com.google.pubsub.v1.SchemaSettings;
import com.google.pubsub.v1.SubscriberGrpc;
import com.google.pubsub.v1.Topic;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.class */
public class PubsubGrpcClientTest {
    private ManagedChannel inProcessChannel;
    private PubsubClient client;
    private String channelName;
    private static final long REQ_TIME_MS = 1234;
    private static final long PUB_TIME_MS = 3456;
    private static final long MESSAGE_TIME_MS = 6789;
    private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
    private static final String ID_ATTRIBUTE = "id";
    private static final String MESSAGE_ID = "testMessageId";
    private static final String DATA = "testData";
    private static final String RECORD_ID = "testRecordId";
    private static final String ACK_ID = "testAckId";
    private static final PubsubClient.TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
    private static final PubsubClient.SubscriptionPath SUBSCRIPTION = PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
    private static final PubsubClient.SchemaPath SCHEMA = PubsubClient.schemaPathFromId("testProject", "testSchemaId");
    private static final ImmutableMap<String, String> ATTRIBUTES = ImmutableMap.builder().put("a", "b").put("c", "d").build();

    @Before
    public void setup() {
        this.channelName = String.format("%s-%s", PubsubGrpcClientTest.class.getName(), Integer.valueOf(ThreadLocalRandom.current().nextInt()));
        this.inProcessChannel = InProcessChannelBuilder.forName(this.channelName).directExecutor().build();
    }

    protected void initializeClient(String str, String str2) {
        this.client = new PubsubGrpcClient(str, str2, 10, this.inProcessChannel, new TestCredential());
    }

    @After
    public void teardown() throws IOException {
        this.client.close();
        this.inProcessChannel.shutdownNow();
    }

    @Test
    public void pullOneMessage() throws IOException {
        initializeClient(null, null);
        PullRequest build = PullRequest.newBuilder().setSubscription(SUBSCRIPTION.getPath()).setReturnImmediately(true).setMaxMessages(10).build();
        final PullResponse build2 = PullResponse.newBuilder().addAllReceivedMessages(ImmutableList.of(ReceivedMessage.newBuilder().setMessage(PubsubMessage.newBuilder().setMessageId(MESSAGE_ID).setData(ByteString.copyFrom(DATA.getBytes(StandardCharsets.UTF_8))).setPublishTime(Timestamp.newBuilder().setSeconds(3L).setNanos(456000000).build()).build()).setAckId(ACK_ID).build())).build();
        final ArrayList arrayList = new ArrayList();
        Server start = InProcessServerBuilder.forName(this.channelName).addService(new SubscriberGrpc.SubscriberImplBase() { // from class: org.apache.beam.sdk.io.gcp.pubsub.PubsubGrpcClientTest.1
            public void pull(PullRequest pullRequest, StreamObserver<PullResponse> streamObserver) {
                arrayList.add(pullRequest);
                streamObserver.onNext(build2);
                streamObserver.onCompleted();
            }
        }).build().start();
        try {
            List pull = this.client.pull(REQ_TIME_MS, SUBSCRIPTION, 10, true);
            Assert.assertEquals(1L, pull.size());
            PubsubClient.IncomingMessage incomingMessage = (PubsubClient.IncomingMessage) pull.get(0);
            Assert.assertEquals(ACK_ID, incomingMessage.ackId());
            Assert.assertEquals(DATA, incomingMessage.message().getData().toStringUtf8());
            Assert.assertEquals(MESSAGE_ID, incomingMessage.recordId());
            Assert.assertEquals(REQ_TIME_MS, incomingMessage.requestTimeMsSinceEpoch());
            Assert.assertEquals(PUB_TIME_MS, incomingMessage.timestampMsSinceEpoch());
            Assert.assertEquals(build, Iterables.getOnlyElement(arrayList));
            start.shutdownNow();
        } catch (Throwable th) {
            start.shutdownNow();
            throw th;
        }
    }

    @Test
    public void pullOneMessageUsingAttributes() throws IOException {
        initializeClient(TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE);
        PullRequest build = PullRequest.newBuilder().setSubscription(SUBSCRIPTION.getPath()).setReturnImmediately(true).setMaxMessages(10).build();
        final PullResponse build2 = PullResponse.newBuilder().addAllReceivedMessages(ImmutableList.of(ReceivedMessage.newBuilder().setMessage(PubsubMessage.newBuilder().setMessageId(MESSAGE_ID).setData(ByteString.copyFrom(DATA.getBytes(StandardCharsets.UTF_8))).setPublishTime(Timestamp.newBuilder().setSeconds(3L).setNanos(456000000).build()).putAllAttributes(ATTRIBUTES).putAllAttributes(ImmutableMap.of(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME_MS), ID_ATTRIBUTE, RECORD_ID)).build()).setAckId(ACK_ID).build())).build();
        final ArrayList arrayList = new ArrayList();
        Server start = InProcessServerBuilder.forName(this.channelName).addService(new SubscriberGrpc.SubscriberImplBase() { // from class: org.apache.beam.sdk.io.gcp.pubsub.PubsubGrpcClientTest.2
            public void pull(PullRequest pullRequest, StreamObserver<PullResponse> streamObserver) {
                arrayList.add(pullRequest);
                streamObserver.onNext(build2);
                streamObserver.onCompleted();
            }
        }).build().start();
        try {
            List pull = this.client.pull(REQ_TIME_MS, SUBSCRIPTION, 10, true);
            Assert.assertEquals(1L, pull.size());
            PubsubClient.IncomingMessage incomingMessage = (PubsubClient.IncomingMessage) pull.get(0);
            Assert.assertEquals(ACK_ID, incomingMessage.ackId());
            Assert.assertEquals(DATA, incomingMessage.message().getData().toStringUtf8());
            Assert.assertEquals(RECORD_ID, incomingMessage.recordId());
            Assert.assertEquals(REQ_TIME_MS, incomingMessage.requestTimeMsSinceEpoch());
            Assert.assertEquals(MESSAGE_TIME_MS, incomingMessage.timestampMsSinceEpoch());
            Assert.assertEquals(build, Iterables.getOnlyElement(arrayList));
            start.shutdownNow();
        } catch (Throwable th) {
            start.shutdownNow();
            throw th;
        }
    }

    @Test
    public void publishOneMessage() throws IOException {
        initializeClient(TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE);
        PublishRequest build = PublishRequest.newBuilder().setTopic(TOPIC.getPath()).addAllMessages(ImmutableList.of(PubsubMessage.newBuilder().setData(ByteString.copyFrom(DATA.getBytes(StandardCharsets.UTF_8))).putAllAttributes(ATTRIBUTES).putAllAttributes(ImmutableMap.of(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME_MS), ID_ATTRIBUTE, RECORD_ID)).build())).build();
        final PublishResponse build2 = PublishResponse.newBuilder().addAllMessageIds(ImmutableList.of(MESSAGE_ID)).build();
        final ArrayList arrayList = new ArrayList();
        Server start = InProcessServerBuilder.forName(this.channelName).addService(new PublisherGrpc.PublisherImplBase() { // from class: org.apache.beam.sdk.io.gcp.pubsub.PubsubGrpcClientTest.3
            public void publish(PublishRequest publishRequest, StreamObserver<PublishResponse> streamObserver) {
                arrayList.add(publishRequest);
                streamObserver.onNext(build2);
                streamObserver.onCompleted();
            }
        }).build().start();
        try {
            Assert.assertEquals(1L, this.client.publish(TOPIC, ImmutableList.of(PubsubClient.OutgoingMessage.of(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(DATA)).putAllAttributes(ATTRIBUTES).build(), MESSAGE_TIME_MS, RECORD_ID))));
            Assert.assertEquals(build, Iterables.getOnlyElement(arrayList));
            start.shutdownNow();
        } catch (Throwable th) {
            start.shutdownNow();
            throw th;
        }
    }

    @Test
    public void getSchemaPath() throws IOException {
        initializeClient(null, null);
        final PubsubClient.TopicPath topicPath = PubsubClient.topicPathFromPath("projects/testProject/topics/idontexist");
        final PubsubClient.TopicPath topicPath2 = PubsubClient.topicPathFromPath("projects/testProject/topics/deletedSchema");
        final PubsubClient.TopicPath topicPath3 = PubsubClient.topicPathFromPath("projects/testProject/topics/noSchema");
        final PubsubClient.TopicPath topicPath4 = PubsubClient.topicPathFromPath("projects/testProject/topics/topicWithSchema");
        Server start = InProcessServerBuilder.forName(this.channelName).addService(new PublisherGrpc.PublisherImplBase() { // from class: org.apache.beam.sdk.io.gcp.pubsub.PubsubGrpcClientTest.4
            public void getTopic(GetTopicRequest getTopicRequest, StreamObserver<Topic> streamObserver) {
                String topic = getTopicRequest.getTopic();
                if (topic.equals(topicPath.getPath())) {
                    streamObserver.onError(new IOException(String.format("%s does not exist", topic)));
                }
                if (topic.equals(topicPath2.getPath())) {
                    streamObserver.onNext(Topic.newBuilder().setName(topic).setSchemaSettings(SchemaSettings.newBuilder().setSchema("_deleted-schema_").build()).build());
                    streamObserver.onCompleted();
                }
                if (topic.equals(topicPath3.getPath())) {
                    streamObserver.onNext(Topic.newBuilder().setName(topic).build());
                    streamObserver.onCompleted();
                }
                if (topic.equals(topicPath4.getPath())) {
                    streamObserver.onNext(Topic.newBuilder().setName(topic).setSchemaSettings(SchemaSettings.newBuilder().setSchema(PubsubGrpcClientTest.SCHEMA.getPath()).build()).build());
                    streamObserver.onCompleted();
                }
            }
        }).build().start();
        try {
            Assert.assertThrows("topic does not exist", StatusRuntimeException.class, () -> {
                this.client.getSchemaPath(topicPath);
            });
            Assert.assertNull("topic with deleted Schema should return null SchemaPath", this.client.getSchemaPath(topicPath2));
            Assert.assertNull("topic without Schema should return null SchemaPath", this.client.getSchemaPath(topicPath3));
            Assert.assertEquals(SCHEMA.getPath(), this.client.getSchemaPath(topicPath4).getPath());
            start.shutdownNow();
        } catch (Throwable th) {
            start.shutdownNow();
            throw th;
        }
    }

    @Test
    public void getAvroSchema() throws IOException {
        initializeClient(null, null);
        final Schema build = Schema.newBuilder().setName(SCHEMA.getPath()).setType(Schema.Type.AVRO).setDefinition("{ \"type\" : \"record\", \"name\" : \"Avro\", \"fields\" : [   {     \"name\" : \"StringField\",     \"type\" : \"string\"   },   {     \"name\" : \"FloatField\",     \"type\" : \"float\"   },   {     \"name\" : \"BooleanField\",     \"type\" : \"boolean\"   } ]}").build();
        Server start = InProcessServerBuilder.forName(this.channelName).addService(new SchemaServiceGrpc.SchemaServiceImplBase() { // from class: org.apache.beam.sdk.io.gcp.pubsub.PubsubGrpcClientTest.5
            public void getSchema(GetSchemaRequest getSchemaRequest, StreamObserver<Schema> streamObserver) {
                if (getSchemaRequest.getName().equals(PubsubGrpcClientTest.SCHEMA.getPath())) {
                    streamObserver.onNext(build);
                    streamObserver.onCompleted();
                }
            }
        }).build().start();
        try {
            Assert.assertEquals(org.apache.beam.sdk.schemas.Schema.of(new Schema.Field[]{Schema.Field.of("StringField", Schema.FieldType.STRING), Schema.Field.of("FloatField", Schema.FieldType.FLOAT), Schema.Field.of("BooleanField", Schema.FieldType.BOOLEAN)}), this.client.getSchema(SCHEMA));
            start.shutdownNow();
        } catch (Throwable th) {
            start.shutdownNow();
            throw th;
        }
    }

    @Test
    public void getProtoSchema() throws IOException {
        initializeClient(null, null);
        final com.google.pubsub.v1.Schema build = com.google.pubsub.v1.Schema.newBuilder().setName(SCHEMA.getPath()).setType(Schema.Type.PROTOCOL_BUFFER).setDefinition("syntax = \"proto3\"; message ProtocolBuffer { string string_field = 1; int32 int_field = 2; }").build();
        Server start = InProcessServerBuilder.forName(this.channelName).addService(new SchemaServiceGrpc.SchemaServiceImplBase() { // from class: org.apache.beam.sdk.io.gcp.pubsub.PubsubGrpcClientTest.6
            public void getSchema(GetSchemaRequest getSchemaRequest, StreamObserver<com.google.pubsub.v1.Schema> streamObserver) {
                if (getSchemaRequest.getName().equals(PubsubGrpcClientTest.SCHEMA.getPath())) {
                    streamObserver.onNext(build);
                    streamObserver.onCompleted();
                }
            }
        }).build().start();
        try {
            Assert.assertThrows("Pub/Sub Schema type PROTOCOL_BUFFER is not supported at this time", IllegalArgumentException.class, () -> {
                this.client.getSchema(SCHEMA);
            });
            start.shutdownNow();
        } catch (Throwable th) {
            start.shutdownNow();
            throw th;
        }
    }
}
