package org.apache.hudi.utilities.sources.helpers.gcs;

import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/gcs/TestPubsubMessagesFetcher.class */
public class TestPubsubMessagesFetcher {
    private static final int SMALL_BATCH_SIZE = 1;
    private static final int MAX_MESSAGES_IN_REQUEST = 1000;
    private final SubscriberStub mockSubscriber = (SubscriberStub) Mockito.mock(SubscriberStub.class);
    private final PubsubQueueClient mockPubsubQueueClient = (PubsubQueueClient) Mockito.mock(PubsubQueueClient.class);
    private static final String PROJECT_ID = "test-project";
    private static final String SUBSCRIPTION_ID = "test-subscription";
    private static final String SUBSCRIPTION_NAME = ProjectSubscriptionName.format(PROJECT_ID, SUBSCRIPTION_ID);
    private static final long MAX_WAIT_TIME_IN_REQUEST = TimeUnit.SECONDS.toMillis(1);

    @Test
    public void testFetchMessages() throws IOException {
        ((SubscriberStub) Mockito.doNothing().when(this.mockSubscriber)).close();
        Mockito.when(this.mockPubsubQueueClient.getSubscriber((SubscriberStubSettings) ArgumentMatchers.any())).thenReturn(this.mockSubscriber);
        Mockito.when(Long.valueOf(this.mockPubsubQueueClient.getNumUnAckedMessages(SUBSCRIPTION_ID))).thenReturn(3L);
        ((SubscriberStub) Mockito.doNothing().when(this.mockSubscriber)).close();
        Mockito.when(this.mockPubsubQueueClient.makePullRequest(this.mockSubscriber, SUBSCRIPTION_NAME, 1)).thenReturn(PullResponse.newBuilder().addReceivedMessages(ReceivedMessage.newBuilder().setAckId("1").setMessage(PubsubMessage.newBuilder().setMessageId("msgId1").build()).build()).build()).thenReturn(PullResponse.newBuilder().addReceivedMessages(ReceivedMessage.newBuilder().setAckId("2").setMessage(PubsubMessage.newBuilder().setMessageId("msgId2").build()).build()).build()).thenReturn(PullResponse.newBuilder().addReceivedMessages(ReceivedMessage.newBuilder().setAckId("3").setMessage(PubsubMessage.newBuilder().setMessageId("msgId3").build()).build()).build());
        List fetchMessages = new PubsubMessagesFetcher(PROJECT_ID, SUBSCRIPTION_ID, 1, MAX_MESSAGES_IN_REQUEST, MAX_WAIT_TIME_IN_REQUEST, this.mockPubsubQueueClient).fetchMessages();
        Assertions.assertEquals(3, fetchMessages.size());
        Assertions.assertEquals("1", ((ReceivedMessage) fetchMessages.get(0)).getAckId());
        Assertions.assertEquals("2", ((ReceivedMessage) fetchMessages.get(1)).getAckId());
        Assertions.assertEquals("3", ((ReceivedMessage) fetchMessages.get(2)).getAckId());
        ((PubsubQueueClient) Mockito.verify(this.mockPubsubQueueClient, Mockito.times(3))).makePullRequest(this.mockSubscriber, SUBSCRIPTION_NAME, 1);
    }

    @Test
    public void testFetchMessagesZeroTimeout() throws IOException {
        ((SubscriberStub) Mockito.doNothing().when(this.mockSubscriber)).close();
        Mockito.when(this.mockPubsubQueueClient.getSubscriber((SubscriberStubSettings) ArgumentMatchers.any())).thenReturn(this.mockSubscriber);
        Mockito.when(Long.valueOf(this.mockPubsubQueueClient.getNumUnAckedMessages(SUBSCRIPTION_ID))).thenReturn(100L);
        Assertions.assertEquals(0, new PubsubMessagesFetcher(PROJECT_ID, SUBSCRIPTION_ID, 1, MAX_MESSAGES_IN_REQUEST, 0L, this.mockPubsubQueueClient).fetchMessages().size());
    }

    @Test
    public void testSendAcks() throws IOException {
        ((SubscriberStub) Mockito.doNothing().when(this.mockSubscriber)).close();
        Mockito.when(this.mockPubsubQueueClient.getSubscriber((SubscriberStubSettings) ArgumentMatchers.any())).thenReturn(this.mockSubscriber);
        List list = (List) IntStream.range(0, 20).mapToObj(i -> {
            return "msg_" + i;
        }).collect(Collectors.toList());
        ((PubsubQueueClient) Mockito.doNothing().when(this.mockPubsubQueueClient)).makeAckRequest((SubscriberStub) ArgumentMatchers.eq(this.mockSubscriber), (String) ArgumentMatchers.eq(SUBSCRIPTION_NAME), (List) ArgumentMatchers.any());
        new PubsubMessagesFetcher(PROJECT_ID, SUBSCRIPTION_ID, 1, MAX_MESSAGES_IN_REQUEST, MAX_WAIT_TIME_IN_REQUEST, this.mockPubsubQueueClient).sendAcks(list);
        ((PubsubQueueClient) Mockito.verify(this.mockPubsubQueueClient, Mockito.times(2))).makeAckRequest((SubscriberStub) ArgumentMatchers.eq(this.mockSubscriber), (String) ArgumentMatchers.eq(SUBSCRIPTION_NAME), (List) ArgumentMatchers.any());
    }
}
