package com.google.cloud.pubsublite.beam;

import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.common.collect.ImmutableList;
import com.google.common.truth.Truth;
import com.google.protobuf.util.Timestamps;
import io.grpc.Status;
import io.grpc.StatusException;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:com/google/cloud/pubsublite/beam/BufferingPullSubscriberTest.class */
public class BufferingPullSubscriberTest {
    private final SubscriberFactory underlyingFactory = (SubscriberFactory) Mockito.mock(SubscriberFactory.class);
    private final Subscriber underlying = (Subscriber) Mockito.mock(Subscriber.class);
    private final Offset initialOffset = Offset.of(5);
    private final FlowControlSettings flowControlSettings;
    private PullSubscriber<SequencedMessage> subscriber;
    private Consumer<ImmutableList<SequencedMessage>> messageConsumer;
    private ApiService.Listener errorListener;

    public BufferingPullSubscriberTest() {
        Supplier supplier = () -> {
            try {
                return FlowControlSettings.builder().setBytesOutstanding(10L).setMessagesOutstanding(20L).build();
            } catch (StatusException e) {
                throw e.getStatus().asRuntimeException();
            }
        };
        this.flowControlSettings = (FlowControlSettings) supplier.get();
    }

    @Before
    public void setUp() throws Exception {
        Mockito.when(this.underlying.startAsync()).thenReturn(this.underlying);
        SeekRequest build = SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(this.initialOffset.value()).build()).build();
        Mockito.when(this.underlying.seek(build)).thenReturn(ApiFutures.immediateFuture(this.initialOffset));
        FlowControlRequest build2 = FlowControlRequest.newBuilder().setAllowedBytes(this.flowControlSettings.bytesOutstanding()).setAllowedMessages(this.flowControlSettings.messagesOutstanding()).build();
        Mockito.when(this.underlyingFactory.New((Consumer) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            this.messageConsumer = (Consumer) invocationOnMock.getArgument(0);
            return this.underlying;
        });
        ((Subscriber) Mockito.doAnswer(invocationOnMock2 -> {
            this.errorListener = (ApiService.Listener) invocationOnMock2.getArgument(0);
            return null;
        }).when(this.underlying)).addListener((ApiService.Listener) ArgumentMatchers.any(), (Executor) ArgumentMatchers.any());
        this.subscriber = new BufferingPullSubscriber(this.underlyingFactory, this.flowControlSettings, this.initialOffset);
        InOrder inOrder = Mockito.inOrder(new Object[]{this.underlyingFactory, this.underlying});
        ((SubscriberFactory) inOrder.verify(this.underlyingFactory)).New((Consumer) ArgumentMatchers.any());
        ((Subscriber) inOrder.verify(this.underlying)).addListener((ApiService.Listener) ArgumentMatchers.any(), (Executor) ArgumentMatchers.any());
        ((Subscriber) inOrder.verify(this.underlying)).startAsync();
        ((Subscriber) inOrder.verify(this.underlying)).awaitRunning();
        ((Subscriber) inOrder.verify(this.underlying)).allowFlow(build2);
        ((Subscriber) inOrder.verify(this.underlying)).seek(build);
        Truth.assertThat(this.messageConsumer).isNotNull();
        Truth.assertThat(this.errorListener).isNotNull();
    }

    @Test
    public void createDestroy() {
    }

    @Test
    public void pullAfterErrorThrows() {
        this.errorListener.failed((ApiService.State) null, Status.INTERNAL.asException());
        PullSubscriber<SequencedMessage> pullSubscriber = this.subscriber;
        pullSubscriber.getClass();
        Truth.assertThat(Assert.assertThrows(StatusException.class, pullSubscriber::pull).getStatus().getCode()).isEqualTo(Status.Code.INTERNAL);
    }

    @Test
    public void emptyPull() throws StatusException {
        Truth.assertThat(this.subscriber.pull()).isEmpty();
    }

    @Test
    public void pullEmptiesForNext() throws StatusException {
        SequencedMessage of = SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(10L), 10L);
        SequencedMessage of2 = SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(11L), 10L);
        this.messageConsumer.accept(ImmutableList.of(of, of2));
        Truth.assertThat(this.subscriber.pull()).containsExactly(new Object[]{of, of2});
        Truth.assertThat(this.subscriber.pull()).isEmpty();
    }

    @Test
    public void multipleBatchesAggregatedReturnsTokens() throws StatusException {
        SequencedMessage of = SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(10L), 10L);
        SequencedMessage of2 = SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(11L), 20L);
        SequencedMessage of3 = SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(12L), 30L);
        this.messageConsumer.accept(ImmutableList.of(of, of2));
        this.messageConsumer.accept(ImmutableList.of(of3));
        Truth.assertThat(this.subscriber.pull()).containsExactly(new Object[]{of, of2, of3});
        Truth.assertThat(this.subscriber.pull()).isEmpty();
        ((Subscriber) Mockito.verify(this.underlying)).allowFlow(FlowControlRequest.newBuilder().setAllowedMessages(3L).setAllowedBytes(60L).build());
    }

    @Test
    public void closeStops() throws Exception {
        Mockito.when(this.underlying.stopAsync()).thenReturn(this.underlying);
        this.subscriber.close();
        ((Subscriber) Mockito.verify(this.underlying)).stopAsync();
        ((Subscriber) Mockito.verify(this.underlying)).awaitTerminated();
    }
}
