/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.api.client.util.Clock;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class PubsubTestClientTest {
    private static final PubsubClient.TopicPath TOPIC = PubsubClient.topicPathFromName((String)"testProject", (String)"testTopic");
    private static final PubsubClient.SubscriptionPath SUBSCRIPTION = PubsubClient.subscriptionPathFromName((String)"testProject", (String)"testSubscription");
    private static final long REQ_TIME = 1234L;
    private static final long MESSAGE_TIME = 6789L;
    private static final String MESSAGE_ID = "testMessageId";
    private static final String DATA = "testData";
    private static final String ACK_ID = "testAckId";
    private static final int ACK_TIMEOUT_S = 60;

    @Test
    public void pullOneMessage() throws IOException {
        AtomicLong now = new AtomicLong();
        Clock clock = now::get;
        PubsubClient.IncomingMessage expectedIncomingMessage = PubsubClient.IncomingMessage.of((PubsubMessage)PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8((String)DATA)).build(), (long)6789L, (long)1234L, (String)ACK_ID, (String)MESSAGE_ID);
        try (PubsubTestClient.PubsubTestClientFactory factory = PubsubTestClient.createFactoryForPull((Clock)clock, (PubsubClient.SubscriptionPath)SUBSCRIPTION, (int)60, (Iterable)Lists.newArrayList((Object[])new PubsubClient.IncomingMessage[]{expectedIncomingMessage}));
             PubsubTestClient client = (PubsubTestClient)factory.newClient(null, null, null);){
            now.set(1234L);
            client.advance();
            List incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
            Assert.assertEquals((long)1L, (long)incomingMessages.size());
            Assert.assertEquals((Object)expectedIncomingMessage, incomingMessages.get(0));
            now.addAndGet(70000L);
            client.advance();
            incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
            Assert.assertEquals((long)1L, (long)incomingMessages.size());
            Assert.assertEquals((Object)PubsubClient.IncomingMessage.of((PubsubMessage)expectedIncomingMessage.message(), (long)expectedIncomingMessage.timestampMsSinceEpoch(), (long)now.get(), (String)expectedIncomingMessage.ackId(), (String)expectedIncomingMessage.recordId()), incomingMessages.get(0));
            now.addAndGet(10000L);
            client.advance();
            client.modifyAckDeadline(SUBSCRIPTION, (List)ImmutableList.of((Object)ACK_ID), 20);
            now.addAndGet(30000L);
            client.advance();
            incomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
            Assert.assertEquals((long)1L, (long)incomingMessages.size());
            Assert.assertEquals((Object)PubsubClient.IncomingMessage.of((PubsubMessage)expectedIncomingMessage.message(), (long)expectedIncomingMessage.timestampMsSinceEpoch(), (long)now.get(), (String)expectedIncomingMessage.ackId(), (String)expectedIncomingMessage.recordId()), incomingMessages.get(0));
            client.modifyAckDeadline(SUBSCRIPTION, (List)ImmutableList.of((Object)ACK_ID), 20);
            now.addAndGet(15000L);
            client.advance();
            client.acknowledge(SUBSCRIPTION, (List)ImmutableList.of((Object)ACK_ID));
        }
    }

    @Test
    public void publishOneMessage() throws IOException {
        PubsubClient.OutgoingMessage expectedOutgoingMessage = PubsubClient.OutgoingMessage.of((PubsubMessage)PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8((String)DATA)).build(), (long)6789L, (String)MESSAGE_ID);
        try (PubsubTestClient.PubsubTestClientFactory factory = PubsubTestClient.createFactoryForPublish((PubsubClient.TopicPath)TOPIC, (Iterable)Sets.newHashSet((Object[])new PubsubClient.OutgoingMessage[]{expectedOutgoingMessage}), (Iterable)ImmutableList.of());
             PubsubTestClient client = (PubsubTestClient)factory.newClient(null, null, null);){
            client.publish(TOPIC, (List)ImmutableList.of((Object)expectedOutgoingMessage));
        }
    }

    @Test
    public void testPullThenPublish() throws IOException {
        AtomicLong now = new AtomicLong();
        Clock clock = now::get;
        PubsubMessage message = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8((String)DATA)).build();
        PubsubClient.IncomingMessage expectedIncomingMessage = PubsubClient.IncomingMessage.of((PubsubMessage)message, (long)6789L, (long)1234L, (String)ACK_ID, (String)MESSAGE_ID);
        PubsubClient.OutgoingMessage expectedOutgoingMessage = PubsubClient.OutgoingMessage.of((PubsubMessage)message, (long)6789L, (String)MESSAGE_ID);
        try (PubsubTestClient.PubsubTestClientFactory factory = PubsubTestClient.createFactoryForPullAndPublish((PubsubClient.SubscriptionPath)SUBSCRIPTION, (PubsubClient.TopicPath)TOPIC, (Clock)clock, (int)60, (Iterable)ImmutableList.of((Object)expectedIncomingMessage), (Iterable)ImmutableList.of((Object)expectedOutgoingMessage), (Iterable)ImmutableList.of());
             PubsubTestClient client = (PubsubTestClient)factory.newClient(null, null, null);){
            now.set(1234L);
            client.advance();
            List actualIncomingMessages = client.pull(now.get(), SUBSCRIPTION, 1, true);
            now.addAndGet(50L);
            client.advance();
            client.acknowledge(SUBSCRIPTION, (List)ImmutableList.of((Object)ACK_ID));
            Assert.assertEquals((long)1L, (long)actualIncomingMessages.size());
            Assert.assertEquals((Object)expectedIncomingMessage, actualIncomingMessages.get(0));
            PubsubClient.IncomingMessage incomingMessage = (PubsubClient.IncomingMessage)actualIncomingMessages.get(0);
            PubsubClient.OutgoingMessage actualOutgoingMessage = PubsubClient.OutgoingMessage.of((PubsubMessage)incomingMessage.message(), (long)incomingMessage.timestampMsSinceEpoch(), (String)incomingMessage.recordId());
            client.publish(TOPIC, (List)ImmutableList.of((Object)actualOutgoingMessage));
        }
    }
}

