package com.google.pubsub.kafka.source;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiService;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.SubscriberInterface;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.common.collect.ImmutableList;
import com.google.common.truth.Truth;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;

/* loaded from: input_file:com/google/pubsub/kafka/source/StreamingPullSubscriberTest.class */
public class StreamingPullSubscriberTest {
    private StreamingPullSubscriber subscriber;
    private MessageReceiver messageReceiver;
    private ApiService.Listener errorListener;
    private final StreamingPullSubscriberFactory underlyingFactory = (StreamingPullSubscriberFactory) Mockito.mock(StreamingPullSubscriberFactory.class);
    private final SubscriberInterface underlying = (SubscriberInterface) Mockito.mock(SubscriberInterface.class);
    private final ExecutorService executorService = Executors.newCachedThreadPool();

    private static List<PubsubMessage> messagesFor(List<ReceivedMessage> list) {
        return (List) list.stream().map((v0) -> {
            return v0.getMessage();
        }).collect(Collectors.toList());
    }

    @Before
    public void setUp() throws Exception {
        Mockito.when(this.underlying.startAsync()).thenReturn(this.underlying);
        Mockito.when(this.underlyingFactory.newSubscriber((MessageReceiver) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            this.messageReceiver = (MessageReceiver) invocationOnMock.getArgument(0);
            return this.underlying;
        });
        ((SubscriberInterface) Mockito.doAnswer(invocationOnMock2 -> {
            this.errorListener = (ApiService.Listener) invocationOnMock2.getArgument(0);
            return null;
        }).when(this.underlying)).addListener((ApiService.Listener) ArgumentMatchers.any(), (Executor) ArgumentMatchers.any());
        this.subscriber = new StreamingPullSubscriber(this.underlyingFactory);
        InOrder inOrder = Mockito.inOrder(new Object[]{this.underlyingFactory, this.underlying});
        ((StreamingPullSubscriberFactory) inOrder.verify(this.underlyingFactory)).newSubscriber((MessageReceiver) ArgumentMatchers.any());
        ((SubscriberInterface) inOrder.verify(this.underlying)).addListener((ApiService.Listener) ArgumentMatchers.any(), (Executor) ArgumentMatchers.any());
        ((SubscriberInterface) inOrder.verify(this.underlying)).startAsync();
        ((SubscriberInterface) inOrder.verify(this.underlying)).awaitRunning();
        Truth.assertThat(this.messageReceiver).isNotNull();
        Truth.assertThat(this.errorListener).isNotNull();
    }

    @Test
    public void closeStops() {
        Mockito.when(this.underlying.stopAsync()).thenReturn(this.underlying);
        this.subscriber.close();
        ((SubscriberInterface) Mockito.verify(this.underlying)).stopAsync();
        ((SubscriberInterface) Mockito.verify(this.underlying)).awaitTerminated();
    }

    @Test
    public void pullAfterErrorThrows() {
        ApiException apiException = new CheckedApiException(StatusCode.Code.INTERNAL).underlying;
        this.errorListener.failed((ApiService.State) null, apiException);
        Truth.assertThat(apiException).isEqualTo(((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause());
    }

    @Test
    public void pullBeforeErrorThrows() throws Exception {
        ApiException apiException = new CheckedApiException(StatusCode.Code.INTERNAL).underlying;
        ApiFuture pull = this.subscriber.pull();
        Thread.sleep(1000L);
        Truth.assertThat(Boolean.valueOf(pull.isDone())).isFalse();
        this.errorListener.failed((ApiService.State) null, apiException);
        pull.getClass();
        Truth.assertThat(apiException).isEqualTo(((ExecutionException) Assert.assertThrows(ExecutionException.class, pull::get)).getCause());
    }

    @Test
    public void pullSuccess() throws Exception {
        PubsubMessage build = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("abc")).build();
        Future submit = this.executorService.submit(() -> {
            return (List) this.subscriber.pull().get();
        });
        this.messageReceiver.receiveMessage(build, (AckReplyConsumer) Mockito.mock(AckReplyConsumer.class));
        Truth.assertThat(messagesFor((List) submit.get())).isEqualTo(ImmutableList.of(build));
    }

    @Test
    public void pullMultiple() throws Exception {
        PubsubMessage build = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("abc")).build();
        PubsubMessage build2 = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("abc")).build();
        this.messageReceiver.receiveMessage(build, (AckReplyConsumer) Mockito.mock(AckReplyConsumer.class));
        this.messageReceiver.receiveMessage(build2, (AckReplyConsumer) Mockito.mock(AckReplyConsumer.class));
        Truth.assertThat(messagesFor((List) this.subscriber.pull().get())).isEqualTo(ImmutableList.of(build, build2));
    }

    @Test
    public void pullMessageWhenError() {
        ApiException apiException = new CheckedApiException(StatusCode.Code.INTERNAL).underlying;
        this.errorListener.failed((ApiService.State) null, apiException);
        Truth.assertThat(((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause()).isEqualTo(apiException);
    }

    @Test
    public void messageAfterErrorNacked() {
        ApiException apiException = new CheckedApiException(StatusCode.Code.INTERNAL).underlying;
        this.errorListener.failed((ApiService.State) null, apiException);
        Truth.assertThat(((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause()).isEqualTo(apiException);
        PubsubMessage build = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("abc")).build();
        AckReplyConsumer ackReplyConsumer = (AckReplyConsumer) Mockito.mock(AckReplyConsumer.class);
        this.messageReceiver.receiveMessage(build, ackReplyConsumer);
        ((AckReplyConsumer) Mockito.verify(ackReplyConsumer, Mockito.times(1))).nack();
    }

    @Test
    public void messagesNackedOnError() {
        PubsubMessage build = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("abc")).build();
        AckReplyConsumer ackReplyConsumer = (AckReplyConsumer) Mockito.mock(AckReplyConsumer.class);
        this.messageReceiver.receiveMessage(build, ackReplyConsumer);
        ApiException apiException = new CheckedApiException(StatusCode.Code.INTERNAL).underlying;
        this.errorListener.failed((ApiService.State) null, apiException);
        Truth.assertThat(((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause()).isEqualTo(apiException);
        ((AckReplyConsumer) Mockito.verify(ackReplyConsumer, Mockito.times(1))).nack();
    }

    @Test
    public void pullMessagePrioritizeErrorOverExistingMessage() {
        ApiException apiException = new CheckedApiException(StatusCode.Code.INTERNAL).underlying;
        this.errorListener.failed((ApiService.State) null, apiException);
        this.messageReceiver.receiveMessage(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("abc")).build(), (AckReplyConsumer) Mockito.mock(AckReplyConsumer.class));
        Truth.assertThat(((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getCause()).isEqualTo(apiException);
    }

    @Test
    public void pullThenAck() throws Exception {
        PubsubMessage build = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("abc")).build();
        Future submit = this.executorService.submit(() -> {
            return (List) this.subscriber.pull().get();
        });
        AckReplyConsumer ackReplyConsumer = (AckReplyConsumer) Mockito.mock(AckReplyConsumer.class);
        this.messageReceiver.receiveMessage(build, ackReplyConsumer);
        List list = (List) submit.get();
        Truth.assertThat(Integer.valueOf(list.size())).isEqualTo(1);
        ReceivedMessage receivedMessage = (ReceivedMessage) list.get(0);
        Truth.assertThat(receivedMessage.getMessage()).isEqualTo(build);
        ((AckReplyConsumer) Mockito.verify(ackReplyConsumer, Mockito.times(0))).ack();
        this.subscriber.ackMessages(ImmutableList.of("not a real ack id", receivedMessage.getAckId())).get();
        ((AckReplyConsumer) Mockito.verify(ackReplyConsumer, Mockito.times(1))).ack();
    }

    @Test
    public void multiAck() throws Exception {
        PubsubMessage build = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("abc")).build();
        PubsubMessage build2 = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("def")).build();
        AckReplyConsumer ackReplyConsumer = (AckReplyConsumer) Mockito.mock(AckReplyConsumer.class);
        AckReplyConsumer ackReplyConsumer2 = (AckReplyConsumer) Mockito.mock(AckReplyConsumer.class);
        this.messageReceiver.receiveMessage(build, ackReplyConsumer);
        this.messageReceiver.receiveMessage(build2, ackReplyConsumer2);
        List list = (List) this.subscriber.pull().get();
        Truth.assertThat(Integer.valueOf(list.size())).isEqualTo(2);
        ReceivedMessage receivedMessage = (ReceivedMessage) list.get(0);
        ReceivedMessage receivedMessage2 = (ReceivedMessage) list.get(1);
        Truth.assertThat(receivedMessage.getMessage()).isEqualTo(build);
        Truth.assertThat(receivedMessage2.getMessage()).isEqualTo(build2);
        ((AckReplyConsumer) Mockito.verify(ackReplyConsumer, Mockito.times(0))).ack();
        ((AckReplyConsumer) Mockito.verify(ackReplyConsumer2, Mockito.times(0))).ack();
        this.subscriber.ackMessages(ImmutableList.of(receivedMessage2.getAckId(), receivedMessage.getAckId())).get();
        ((AckReplyConsumer) Mockito.verify(ackReplyConsumer, Mockito.times(1))).ack();
        ((AckReplyConsumer) Mockito.verify(ackReplyConsumer2, Mockito.times(1))).ack();
        this.subscriber.ackMessages(ImmutableList.of(receivedMessage2.getAckId(), receivedMessage.getAckId())).get();
        ((AckReplyConsumer) Mockito.verify(ackReplyConsumer, Mockito.times(1))).ack();
        ((AckReplyConsumer) Mockito.verify(ackReplyConsumer2, Mockito.times(1))).ack();
    }
}
