/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryBufferedSubscriber;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryBufferedSubscriberImpl;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryLimiter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.mockito.verification.VerificationMode;

@RunWith(value=JUnit4.class)
public class MemoryBufferedSubscriberImplTest {
    private static final long MAX_MEMORY = 1024L;
    @Rule
    public Timeout globalTimeout = Timeout.seconds((long)30L);
    @Spy
    FakeSubscriber subscriber;
    @Mock
    Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory;
    @Mock
    MemoryLimiter limiter;
    @Mock
    MemoryLimiter.Block block;
    MemoryBufferedSubscriber bufferedSubscriber;
    Consumer<List<SequencedMessage>> consumer;

    private static SequencedMessage messageWithSize(long size) {
        return SequencedMessage.newBuilder().setSizeBytes(size).build();
    }

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks((Object)this);
        ((Function)Mockito.doAnswer(args -> {
            this.consumer = (Consumer)args.getArgument(0);
            return this.subscriber;
        }).when(this.subscriberFactory)).apply((Consumer)ArgumentMatchers.any());
        ((MemoryLimiter)Mockito.doReturn((Object)1L).when((Object)this.limiter)).minBlockSize();
        ((MemoryLimiter)Mockito.doReturn((Object)1024L).when((Object)this.limiter)).maxBlockSize();
        Preconditions.checkNotNull((Object)this.block);
        Preconditions.checkNotNull((Object)this.limiter);
        ((MemoryLimiter)Mockito.doReturn((Object)this.block).when((Object)this.limiter)).claim(ArgumentMatchers.anyLong());
        ((MemoryLimiter.Block)Mockito.doReturn((Object)1024L).when((Object)this.block)).claimed();
        this.bufferedSubscriber = new MemoryBufferedSubscriberImpl((Partition)UnitTestExamples.example(Partition.class), (Offset)UnitTestExamples.example(Offset.class), this.limiter, this.subscriberFactory);
        Preconditions.checkNotNull(this.consumer);
        this.bufferedSubscriber.startAsync().awaitRunning();
        ((FakeSubscriber)((Object)Mockito.verify((Object)((Object)this.subscriber)))).startAsync();
        Assert.assertTrue((boolean)this.subscriber.isRunning());
    }

    @Test
    public void underlyingFailureFails() {
        this.subscriber.fail(new RuntimeException("bad"));
        Assert.assertThrows(Exception.class, () -> ((FakeSubscriber)this.subscriber).awaitTerminated());
    }

    @Test
    public void rebufferReducesToOutstandingWhenLittleData() {
        this.consumer.accept((List<SequencedMessage>)ImmutableList.of((Object)MemoryBufferedSubscriberImplTest.messageWithSize(256L)));
        this.bufferedSubscriber.pop();
        this.bufferedSubscriber.rebuffer();
        ((MemoryLimiter.Block)Mockito.verify((Object)this.block)).close();
        ((MemoryLimiter)Mockito.verify((Object)this.limiter)).claim(768L);
    }

    @Test
    public void rebufferCannotGoBelowMin() {
        long minBlock = 819L;
        ((MemoryLimiter)Mockito.doReturn((Object)minBlock).when((Object)this.limiter)).minBlockSize();
        for (int i = 0; i < 1000; ++i) {
            this.bufferedSubscriber.rebuffer();
        }
        Mockito.reset((Object[])new MemoryLimiter[]{this.limiter});
        ((MemoryLimiter)Mockito.doReturn((Object)minBlock).when((Object)this.limiter)).minBlockSize();
        ((MemoryLimiter)Mockito.doReturn((Object)this.block).when((Object)this.limiter)).claim(ArgumentMatchers.anyLong());
        this.consumer.accept((List<SequencedMessage>)ImmutableList.of((Object)MemoryBufferedSubscriberImplTest.messageWithSize(409L)));
        this.bufferedSubscriber.pop();
        this.bufferedSubscriber.rebuffer();
        ((MemoryLimiter)Mockito.verify((Object)this.limiter)).claim(minBlock);
    }

    @Test
    public void rebufferStaysSameOnHalfDelivered() {
        this.consumer.accept((List<SequencedMessage>)ImmutableList.of((Object)MemoryBufferedSubscriberImplTest.messageWithSize(256L)));
        this.bufferedSubscriber.pop();
        this.bufferedSubscriber.rebuffer();
        ((MemoryLimiter)Mockito.verify((Object)this.limiter)).claim(768L);
        this.consumer.accept((List<SequencedMessage>)ImmutableList.of((Object)MemoryBufferedSubscriberImplTest.messageWithSize(384L)));
        this.bufferedSubscriber.pop();
        this.bufferedSubscriber.rebuffer();
        ((MemoryLimiter)Mockito.verify((Object)this.limiter)).claim(768L);
    }

    @Test
    public void rebufferGrowsOnMoreDelivered() {
        this.consumer.accept((List<SequencedMessage>)ImmutableList.of((Object)MemoryBufferedSubscriberImplTest.messageWithSize(256L)));
        this.bufferedSubscriber.pop();
        this.bufferedSubscriber.rebuffer();
        ((MemoryLimiter)Mockito.verify((Object)this.limiter)).claim(768L);
        this.consumer.accept((List<SequencedMessage>)ImmutableList.of((Object)MemoryBufferedSubscriberImplTest.messageWithSize(512L), (Object)MemoryBufferedSubscriberImplTest.messageWithSize(128L)));
        this.bufferedSubscriber.rebuffer();
        ((MemoryLimiter)Mockito.verify((Object)this.limiter, (VerificationMode)Mockito.times((int)2))).claim(1024L);
    }

    @Test
    public void dataAvailableToCaller() {
        SequencedMessage message1 = SequencedMessage.newBuilder().setCursor(Cursor.newBuilder().setOffset(((Offset)UnitTestExamples.example(Offset.class)).value() + 10L)).setSizeBytes(1L).build();
        SequencedMessage message2 = SequencedMessage.newBuilder().setCursor(Cursor.newBuilder().setOffset(((Offset)UnitTestExamples.example(Offset.class)).value() + 20L)).setSizeBytes(1L).build();
        Assert.assertFalse((boolean)this.bufferedSubscriber.peek().isPresent());
        this.consumer.accept((List<SequencedMessage>)ImmutableList.of((Object)message1, (Object)message2));
        Assert.assertEquals((Object)this.bufferedSubscriber.fetchOffset(), (Object)UnitTestExamples.example(Offset.class));
        Assert.assertEquals(this.bufferedSubscriber.peek().get(), (Object)message1);
        this.bufferedSubscriber.pop();
        Assert.assertEquals((Object)this.bufferedSubscriber.fetchOffset(), (Object)Offset.of((long)(message1.getCursor().getOffset() + 1L)));
        Assert.assertEquals(this.bufferedSubscriber.peek().get(), (Object)message2);
        this.bufferedSubscriber.pop();
        Assert.assertEquals((Object)this.bufferedSubscriber.fetchOffset(), (Object)Offset.of((long)(message2.getCursor().getOffset() + 1L)));
    }

    static abstract class FakeSubscriber
    extends FakeApiService
    implements Subscriber {
        FakeSubscriber() {
        }
    }
}

