package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryLimiter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImplTest.class */
public class MemoryBufferedSubscriberImplTest {
    private static final long MAX_MEMORY = 1024;

    @Rule
    public Timeout globalTimeout = Timeout.seconds(30);

    @Spy
    FakeSubscriber subscriber;

    @Mock
    Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory;

    @Mock
    MemoryLimiter limiter;

    @Mock
    MemoryLimiter.Block block;
    MemoryBufferedSubscriber bufferedSubscriber;
    Consumer<List<SequencedMessage>> consumer;

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImplTest$FakeSubscriber.class */
    static abstract class FakeSubscriber extends FakeApiService implements Subscriber {
        FakeSubscriber() {
        }
    }

    private static SequencedMessage messageWithSize(long j) {
        return SequencedMessage.newBuilder().setSizeBytes(j).build();
    }

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
        ((Function) Mockito.doAnswer(invocationOnMock -> {
            this.consumer = (Consumer) invocationOnMock.getArgument(0);
            return this.subscriber;
        }).when(this.subscriberFactory)).apply((Consumer) ArgumentMatchers.any());
        ((MemoryLimiter) Mockito.doReturn(1L).when(this.limiter)).minBlockSize();
        ((MemoryLimiter) Mockito.doReturn(Long.valueOf(MAX_MEMORY)).when(this.limiter)).maxBlockSize();
        Preconditions.checkNotNull(this.block);
        Preconditions.checkNotNull(this.limiter);
        ((MemoryLimiter) Mockito.doReturn(this.block).when(this.limiter)).claim(ArgumentMatchers.anyLong());
        ((MemoryLimiter.Block) Mockito.doReturn(Long.valueOf(MAX_MEMORY)).when(this.block)).claimed();
        this.bufferedSubscriber = new MemoryBufferedSubscriberImpl((Partition) UnitTestExamples.example(Partition.class), (Offset) UnitTestExamples.example(Offset.class), this.limiter, this.subscriberFactory);
        Preconditions.checkNotNull(this.consumer);
        this.bufferedSubscriber.startAsync().awaitRunning();
        ((FakeSubscriber) Mockito.verify(this.subscriber)).startAsync();
        Assert.assertTrue(this.subscriber.isRunning());
    }

    @Test
    public void underlyingFailureFails() {
        this.subscriber.fail(new RuntimeException("bad"));
        FakeSubscriber fakeSubscriber = this.subscriber;
        Objects.requireNonNull(fakeSubscriber);
        Assert.assertThrows(Exception.class, fakeSubscriber::awaitTerminated);
    }

    @Test
    public void rebufferReducesToOutstandingWhenLittleData() {
        this.consumer.accept(ImmutableList.of(messageWithSize(256L)));
        this.bufferedSubscriber.pop();
        this.bufferedSubscriber.rebuffer();
        ((MemoryLimiter.Block) Mockito.verify(this.block)).close();
        ((MemoryLimiter) Mockito.verify(this.limiter)).claim(768L);
    }

    @Test
    public void rebufferCannotGoBelowMin() {
        ((MemoryLimiter) Mockito.doReturn(819L).when(this.limiter)).minBlockSize();
        for (int i = 0; i < 1000; i++) {
            this.bufferedSubscriber.rebuffer();
        }
        Mockito.reset(new MemoryLimiter[]{this.limiter});
        ((MemoryLimiter) Mockito.doReturn(819L).when(this.limiter)).minBlockSize();
        ((MemoryLimiter) Mockito.doReturn(this.block).when(this.limiter)).claim(ArgumentMatchers.anyLong());
        this.consumer.accept(ImmutableList.of(messageWithSize(409L)));
        this.bufferedSubscriber.pop();
        this.bufferedSubscriber.rebuffer();
        ((MemoryLimiter) Mockito.verify(this.limiter)).claim(819L);
    }

    @Test
    public void rebufferStaysSameOnHalfDelivered() {
        this.consumer.accept(ImmutableList.of(messageWithSize(256L)));
        this.bufferedSubscriber.pop();
        this.bufferedSubscriber.rebuffer();
        ((MemoryLimiter) Mockito.verify(this.limiter)).claim(768L);
        this.consumer.accept(ImmutableList.of(messageWithSize(384L)));
        this.bufferedSubscriber.pop();
        this.bufferedSubscriber.rebuffer();
        ((MemoryLimiter) Mockito.verify(this.limiter)).claim(768L);
    }

    @Test
    public void rebufferGrowsOnMoreDelivered() {
        this.consumer.accept(ImmutableList.of(messageWithSize(256L)));
        this.bufferedSubscriber.pop();
        this.bufferedSubscriber.rebuffer();
        ((MemoryLimiter) Mockito.verify(this.limiter)).claim(768L);
        this.consumer.accept(ImmutableList.of(messageWithSize(512L), messageWithSize(128L)));
        this.bufferedSubscriber.rebuffer();
        ((MemoryLimiter) Mockito.verify(this.limiter, Mockito.times(2))).claim(MAX_MEMORY);
    }

    @Test
    public void dataAvailableToCaller() {
        SequencedMessage build = SequencedMessage.newBuilder().setCursor(Cursor.newBuilder().setOffset(((Offset) UnitTestExamples.example(Offset.class)).value() + 10)).setSizeBytes(1L).build();
        SequencedMessage build2 = SequencedMessage.newBuilder().setCursor(Cursor.newBuilder().setOffset(((Offset) UnitTestExamples.example(Offset.class)).value() + 20)).setSizeBytes(1L).build();
        Assert.assertFalse(this.bufferedSubscriber.peek().isPresent());
        this.consumer.accept(ImmutableList.of(build, build2));
        Assert.assertEquals(this.bufferedSubscriber.fetchOffset(), UnitTestExamples.example(Offset.class));
        Assert.assertEquals(this.bufferedSubscriber.peek().get(), build);
        this.bufferedSubscriber.pop();
        Assert.assertEquals(this.bufferedSubscriber.fetchOffset(), Offset.of(build.getCursor().getOffset() + 1));
        Assert.assertEquals(this.bufferedSubscriber.peek().get(), build2);
        this.bufferedSubscriber.pop();
        Assert.assertEquals(this.bufferedSubscriber.fetchOffset(), Offset.of(build2.getCursor().getOffset() + 1));
    }
}
