package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PublisherGrpc;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.SubscriberGrpc;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.class */
public class PubsubGrpcClientTest {
    private ManagedChannel inProcessChannel;
    private PubsubClient client;
    private String channelName;
    private static final long REQ_TIME = 1234;
    private static final long PUB_TIME = 3456;
    private static final long MESSAGE_TIME = 6789;
    private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
    private static final String ID_ATTRIBUTE = "id";
    private static final String MESSAGE_ID = "testMessageId";
    private static final String DATA = "testData";
    private static final String RECORD_ID = "testRecordId";
    private static final String ACK_ID = "testAckId";
    private static final PubsubClient.TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
    private static final PubsubClient.SubscriptionPath SUBSCRIPTION = PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
    private static final ImmutableMap<String, String> ATTRIBUTES = ImmutableMap.builder().put("a", "b").put("c", "d").build();

    @Before
    public void setup() {
        this.channelName = String.format("%s-%s", PubsubGrpcClientTest.class.getName(), Integer.valueOf(ThreadLocalRandom.current().nextInt()));
        this.inProcessChannel = InProcessChannelBuilder.forName(this.channelName).directExecutor().build();
        this.client = new PubsubGrpcClient(TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 10, this.inProcessChannel, new TestCredential());
    }

    @After
    public void teardown() throws IOException {
        this.client.close();
        this.inProcessChannel.shutdownNow();
    }

    @Test
    public void pullOneMessage() throws IOException {
        PullRequest build = PullRequest.newBuilder().setSubscription(SUBSCRIPTION.getPath()).setReturnImmediately(true).setMaxMessages(10).build();
        final PullResponse build2 = PullResponse.newBuilder().addAllReceivedMessages(ImmutableList.of(ReceivedMessage.newBuilder().setMessage(PubsubMessage.newBuilder().setMessageId(MESSAGE_ID).setData(ByteString.copyFrom(DATA.getBytes(StandardCharsets.UTF_8))).setPublishTime(Timestamp.newBuilder().setSeconds(3L).setNanos(456000).build()).putAllAttributes(ATTRIBUTES).putAllAttributes(ImmutableMap.of(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME), ID_ATTRIBUTE, RECORD_ID)).build()).setAckId(ACK_ID).build())).build();
        final ArrayList arrayList = new ArrayList();
        Server start = InProcessServerBuilder.forName(this.channelName).addService(new SubscriberGrpc.SubscriberImplBase() { // from class: org.apache.beam.sdk.io.gcp.pubsub.PubsubGrpcClientTest.1
            public void pull(PullRequest pullRequest, StreamObserver<PullResponse> streamObserver) {
                arrayList.add(pullRequest);
                streamObserver.onNext(build2);
                streamObserver.onCompleted();
            }
        }).build().start();
        try {
            List pull = this.client.pull(REQ_TIME, SUBSCRIPTION, 10, true);
            Assert.assertEquals(1L, pull.size());
            PubsubClient.IncomingMessage incomingMessage = (PubsubClient.IncomingMessage) pull.get(0);
            Assert.assertEquals(ACK_ID, incomingMessage.ackId);
            Assert.assertEquals(DATA, new String(incomingMessage.elementBytes, StandardCharsets.UTF_8));
            Assert.assertEquals(RECORD_ID, incomingMessage.recordId);
            Assert.assertEquals(REQ_TIME, incomingMessage.requestTimeMsSinceEpoch);
            Assert.assertEquals(MESSAGE_TIME, incomingMessage.timestampMsSinceEpoch);
            Assert.assertEquals(build, Iterables.getOnlyElement(arrayList));
            start.shutdownNow();
        } catch (Throwable th) {
            start.shutdownNow();
            throw th;
        }
    }

    @Test
    public void publishOneMessage() throws IOException {
        PublishRequest build = PublishRequest.newBuilder().setTopic(TOPIC.getPath()).addAllMessages(ImmutableList.of(PubsubMessage.newBuilder().setData(ByteString.copyFrom(DATA.getBytes(StandardCharsets.UTF_8))).putAllAttributes(ATTRIBUTES).putAllAttributes(ImmutableMap.of(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME), ID_ATTRIBUTE, RECORD_ID)).build())).build();
        final PublishResponse build2 = PublishResponse.newBuilder().addAllMessageIds(ImmutableList.of(MESSAGE_ID)).build();
        final ArrayList arrayList = new ArrayList();
        Server start = InProcessServerBuilder.forName(this.channelName).addService(new PublisherGrpc.PublisherImplBase() { // from class: org.apache.beam.sdk.io.gcp.pubsub.PubsubGrpcClientTest.2
            public void publish(PublishRequest publishRequest, StreamObserver<PublishResponse> streamObserver) {
                arrayList.add(publishRequest);
                streamObserver.onNext(build2);
                streamObserver.onCompleted();
            }
        }).build().start();
        try {
            Assert.assertEquals(1L, this.client.publish(TOPIC, ImmutableList.of(new PubsubClient.OutgoingMessage(DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES, MESSAGE_TIME, RECORD_ID))));
            Assert.assertEquals(build, Iterables.getOnlyElement(arrayList));
            start.shutdownNow();
        } catch (Throwable th) {
            start.shutdownNow();
            throw th;
        }
    }
}
