/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.auth.Credentials;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PublisherGrpc;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.SubscriberGrpc;
import io.grpc.BindableService;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubGrpcClient;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class PubsubGrpcClientTest {
    private @UnknownKeyFor @NonNull @Initialized ManagedChannel inProcessChannel;
    private @UnknownKeyFor @NonNull @Initialized PubsubClient client;
    private @UnknownKeyFor @NonNull @Initialized String channelName;
    private static final // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized PubsubClient.TopicPath TOPIC = PubsubClient.topicPathFromName((String)"testProject", (String)"testTopic");
    private static final // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized PubsubClient.SubscriptionPath SUBSCRIPTION = PubsubClient.subscriptionPathFromName((String)"testProject", (String)"testSubscription");
    private static final @UnknownKeyFor @NonNull @Initialized long REQ_TIME = 1234L;
    private static final @UnknownKeyFor @NonNull @Initialized long PUB_TIME = 3456L;
    private static final @UnknownKeyFor @NonNull @Initialized long MESSAGE_TIME = 6789L;
    private static final @UnknownKeyFor @NonNull @Initialized String TIMESTAMP_ATTRIBUTE = "timestamp";
    private static final @UnknownKeyFor @NonNull @Initialized String ID_ATTRIBUTE = "id";
    private static final @UnknownKeyFor @NonNull @Initialized String MESSAGE_ID = "testMessageId";
    private static final @UnknownKeyFor @NonNull @Initialized String DATA = "testData";
    private static final @UnknownKeyFor @NonNull @Initialized String RECORD_ID = "testRecordId";
    private static final @UnknownKeyFor @NonNull @Initialized String ACK_ID = "testAckId";
    private static final @UnknownKeyFor @NonNull @Initialized ImmutableMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> ATTRIBUTES = ImmutableMap.builder().put((Object)"a", (Object)"b").put((Object)"c", (Object)"d").build();

    @Before
    public void setup() {
        this.channelName = String.format("%s-%s", PubsubGrpcClientTest.class.getName(), ThreadLocalRandom.current().nextInt());
        this.inProcessChannel = ((InProcessChannelBuilder)InProcessChannelBuilder.forName((String)this.channelName).directExecutor()).build();
        TestCredential testCredentials = new TestCredential();
        this.client = new PubsubGrpcClient(TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 10, this.inProcessChannel, (Credentials)testCredentials);
    }

    @After
    public void teardown() throws @UnknownKeyFor @NonNull @Initialized IOException {
        this.client.close();
        this.inProcessChannel.shutdownNow();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void pullOneMessage() throws @UnknownKeyFor @NonNull @Initialized IOException {
        String expectedSubscription = SUBSCRIPTION.getPath();
        PullRequest expectedRequest = PullRequest.newBuilder().setSubscription(expectedSubscription).setReturnImmediately(true).setMaxMessages(10).build();
        Timestamp timestamp = Timestamp.newBuilder().setSeconds(3L).setNanos(456000).build();
        PubsubMessage expectedPubsubMessage = PubsubMessage.newBuilder().setMessageId(MESSAGE_ID).setData(ByteString.copyFrom((byte[])DATA.getBytes(StandardCharsets.UTF_8))).setPublishTime(timestamp).putAllAttributes(ATTRIBUTES).putAllAttributes((Map)ImmutableMap.of((Object)TIMESTAMP_ATTRIBUTE, (Object)String.valueOf(6789L), (Object)ID_ATTRIBUTE, (Object)RECORD_ID)).build();
        ReceivedMessage expectedReceivedMessage = ReceivedMessage.newBuilder().setMessage(expectedPubsubMessage).setAckId(ACK_ID).build();
        final PullResponse response = PullResponse.newBuilder().addAllReceivedMessages((Iterable)ImmutableList.of((Object)expectedReceivedMessage)).build();
        final ArrayList requestsReceived = new ArrayList();
        SubscriberGrpc.SubscriberImplBase subscriberImplBase = new SubscriberGrpc.SubscriberImplBase(){

            public void pull(@UnknownKeyFor @NonNull @Initialized PullRequest request, @UnknownKeyFor @NonNull @Initialized StreamObserver<@UnknownKeyFor @NonNull @Initialized PullResponse> responseObserver) {
                requestsReceived.add(request);
                responseObserver.onNext((Object)response);
                responseObserver.onCompleted();
            }
        };
        Server server = ((InProcessServerBuilder)InProcessServerBuilder.forName((String)this.channelName).addService((BindableService)subscriberImplBase)).build().start();
        try {
            List acutalMessages = this.client.pull(1234L, SUBSCRIPTION, 10, true);
            Assert.assertEquals((long)1L, (long)acutalMessages.size());
            PubsubClient.IncomingMessage actualMessage = (PubsubClient.IncomingMessage)acutalMessages.get(0);
            Assert.assertEquals((Object)ACK_ID, (Object)actualMessage.ackId());
            Assert.assertEquals((Object)DATA, (Object)actualMessage.message().getData().toStringUtf8());
            Assert.assertEquals((Object)RECORD_ID, (Object)actualMessage.recordId());
            Assert.assertEquals((long)1234L, (long)actualMessage.requestTimeMsSinceEpoch());
            Assert.assertEquals((long)6789L, (long)actualMessage.timestampMsSinceEpoch());
            Assert.assertEquals((Object)expectedRequest, (Object)Iterables.getOnlyElement(requestsReceived));
        }
        finally {
            server.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void publishOneMessage() throws @UnknownKeyFor @NonNull @Initialized IOException {
        String expectedTopic = TOPIC.getPath();
        PubsubMessage expectedPubsubMessage = PubsubMessage.newBuilder().setData(ByteString.copyFrom((byte[])DATA.getBytes(StandardCharsets.UTF_8))).putAllAttributes(ATTRIBUTES).putAllAttributes((Map)ImmutableMap.of((Object)TIMESTAMP_ATTRIBUTE, (Object)String.valueOf(6789L), (Object)ID_ATTRIBUTE, (Object)RECORD_ID)).build();
        PublishRequest expectedRequest = PublishRequest.newBuilder().setTopic(expectedTopic).addAllMessages((Iterable)ImmutableList.of((Object)expectedPubsubMessage)).build();
        final PublishResponse response = PublishResponse.newBuilder().addAllMessageIds((Iterable)ImmutableList.of((Object)MESSAGE_ID)).build();
        final ArrayList requestsReceived = new ArrayList();
        PublisherGrpc.PublisherImplBase publisherImplBase = new PublisherGrpc.PublisherImplBase(){

            public void publish(@UnknownKeyFor @NonNull @Initialized PublishRequest request, @UnknownKeyFor @NonNull @Initialized StreamObserver<@UnknownKeyFor @NonNull @Initialized PublishResponse> responseObserver) {
                requestsReceived.add(request);
                responseObserver.onNext((Object)response);
                responseObserver.onCompleted();
            }
        };
        Server server = ((InProcessServerBuilder)InProcessServerBuilder.forName((String)this.channelName).addService((BindableService)publisherImplBase)).build().start();
        try {
            PubsubClient.OutgoingMessage actualMessage = PubsubClient.OutgoingMessage.of((PubsubMessage)PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8((String)DATA)).putAllAttributes(ATTRIBUTES).build(), (long)6789L, (String)RECORD_ID);
            int n = this.client.publish(TOPIC, (List)ImmutableList.of((Object)actualMessage));
            Assert.assertEquals((long)1L, (long)n);
            Assert.assertEquals((Object)expectedRequest, (Object)Iterables.getOnlyElement(requestsReceived));
        }
        finally {
            server.shutdownNow();
        }
    }
}

