package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImplTest.class */
public class UnboundedReaderImplTest {
    private static final Offset INITIAL_OFFSET = Offset.of(1);

    @Rule
    public MockitoRule mockito = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);

    @Spy
    private UnboundedSource<SequencedMessage, CheckpointMarkImpl> source;

    @Spy
    private FakeSubscriber subscriber;

    @Mock
    private TopicBacklogReader backlogReader;

    @Mock
    private BlockingCommitter committer;
    private UnboundedReaderImpl reader;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/UnboundedReaderImplTest$FakeSubscriber.class */
    public static abstract class FakeSubscriber extends FakeApiService implements MemoryBufferedSubscriber {
        FakeSubscriber() {
        }
    }

    private static SequencedMessage messageWith(String str, long j, Instant instant) {
        return com.google.cloud.pubsublite.SequencedMessage.of(Message.builder().setData(ByteString.copyFromUtf8(str)).build(), Timestamps.fromMillis(instant.getMillis()), Offset.of(j), 21L).toProto();
    }

    private void startSubscriber() throws Exception {
        ((FakeSubscriber) Mockito.doReturn(Optional.empty()).when(this.subscriber)).peek();
        Assert.assertFalse(this.reader.start());
        ((FakeSubscriber) Mockito.verify(this.subscriber)).startAsync();
        ((FakeSubscriber) Mockito.verify(this.subscriber)).awaitRunning(1L, TimeUnit.MINUTES);
    }

    private void advancePastMessage(long j) throws Exception {
        ((FakeSubscriber) Mockito.doReturn(Optional.of(messageWith("abc", j, Instant.now()))).when(this.subscriber)).peek();
        Assert.assertTrue(this.reader.advance());
        ((FakeSubscriber) Mockito.doAnswer(invocationOnMock -> {
            ((FakeSubscriber) Mockito.doReturn(Optional.empty()).when(this.subscriber)).peek();
            return null;
        }).when(this.subscriber)).pop();
        Assert.assertFalse(this.reader.advance());
    }

    @Before
    public void setUp() {
        ((FakeSubscriber) Mockito.doReturn(INITIAL_OFFSET).when(this.subscriber)).fetchOffset();
        this.reader = new UnboundedReaderImpl(this.source, this.subscriber, this.backlogReader, () -> {
            return this.committer;
        }, Offset.of(1L));
    }

    @Test
    public void startAdvances() throws Exception {
        Instant now = Instant.now();
        SequencedMessage messageWith = messageWith("abc", 2L, now);
        ((FakeSubscriber) Mockito.doReturn(Optional.of(messageWith)).when(this.subscriber)).peek();
        Assert.assertTrue(this.reader.start());
        ((FakeSubscriber) Mockito.verify(this.subscriber)).startAsync();
        ((FakeSubscriber) Mockito.verify(this.subscriber)).awaitRunning(1L, TimeUnit.MINUTES);
        Assert.assertEquals(this.reader.getCurrent(), messageWith);
        Assert.assertEquals(this.reader.getWatermark(), now);
    }

    @Test
    public void startAdvancesNoMessage() throws Exception {
        ((FakeSubscriber) Mockito.doReturn(Optional.empty()).when(this.subscriber)).peek();
        Assert.assertFalse(this.reader.start());
        ((FakeSubscriber) Mockito.verify(this.subscriber)).startAsync();
        ((FakeSubscriber) Mockito.verify(this.subscriber)).awaitRunning(1L, TimeUnit.MINUTES);
        UnboundedReaderImpl unboundedReaderImpl = this.reader;
        Objects.requireNonNull(unboundedReaderImpl);
        Assert.assertThrows(NoSuchElementException.class, unboundedReaderImpl::getCurrent);
        Assert.assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, this.reader.getWatermark());
    }

    @Test
    public void advanceNoPreviousValue() throws Exception {
        startSubscriber();
        ((FakeSubscriber) Mockito.doReturn(Optional.of(messageWith("abc", 2L, Instant.now()))).when(this.subscriber)).peek();
        Assert.assertTrue(this.reader.advance());
        ((FakeSubscriber) Mockito.verify(this.subscriber, Mockito.times(0))).pop();
    }

    @Test
    public void advanceWithPreviousValue() throws Exception {
        startSubscriber();
        Instant now = Instant.now();
        Instant now2 = Instant.now();
        SequencedMessage messageWith = messageWith("abc", 2L, now);
        SequencedMessage messageWith2 = messageWith("def", 3L, now2);
        ((FakeSubscriber) Mockito.doReturn(Optional.of(messageWith)).when(this.subscriber)).peek();
        Assert.assertTrue(this.reader.advance());
        Assert.assertEquals(this.reader.getCurrent(), messageWith);
        Assert.assertEquals(this.reader.getCurrentTimestamp(), now);
        Assert.assertEquals(this.reader.getWatermark(), now);
        ((FakeSubscriber) Mockito.doAnswer(invocationOnMock -> {
            ((FakeSubscriber) Mockito.doReturn(Optional.of(messageWith2)).when(this.subscriber)).peek();
            return null;
        }).when(this.subscriber)).pop();
        Assert.assertTrue(this.reader.advance());
        ((FakeSubscriber) Mockito.verify(this.subscriber)).pop();
        Assert.assertEquals(this.reader.getCurrent(), messageWith2);
        Assert.assertEquals(this.reader.getCurrentTimestamp(), now2);
        Assert.assertEquals(this.reader.getWatermark(), now);
    }

    @Test
    public void advanceSubscriberNotRunningThrows() throws Exception {
        startSubscriber();
        this.subscriber.fail(new RuntimeException("I failed"));
        UnboundedReaderImpl unboundedReaderImpl = this.reader;
        Objects.requireNonNull(unboundedReaderImpl);
        Assert.assertThrows(IOException.class, unboundedReaderImpl::advance);
    }

    @Test
    public void getCheckpointMark() throws Exception {
        startSubscriber();
        advancePastMessage(2L);
        CheckpointMarkImpl checkpointMark = this.reader.getCheckpointMark();
        ((FakeSubscriber) Mockito.verify(this.subscriber)).rebuffer();
        Assert.assertEquals(3L, checkpointMark.offset.value());
    }

    @Test
    public void getSplitBacklogBytes() throws Exception {
        startSubscriber();
        advancePastMessage(2L);
        ((TopicBacklogReader) Mockito.doReturn(ComputeMessageStatsResponse.newBuilder().setMessageBytes(42L).build()).when(this.backlogReader)).computeMessageStats(Offset.of(3L));
        Assert.assertEquals(42L, this.reader.getSplitBacklogBytes());
    }

    @Test
    public void closeClosesAll() throws Exception {
        startSubscriber();
        ((FakeSubscriber) Mockito.doThrow(new Throwable[]{new IllegalStateException("abc")}).when(this.subscriber)).awaitTerminated(1L, TimeUnit.MINUTES);
        ((TopicBacklogReader) Mockito.doThrow(new Throwable[]{new IllegalStateException("def")}).when(this.backlogReader)).close();
        UnboundedReaderImpl unboundedReaderImpl = this.reader;
        Objects.requireNonNull(unboundedReaderImpl);
        Assert.assertThrows(IOException.class, unboundedReaderImpl::close);
        ((FakeSubscriber) Mockito.verify(this.subscriber)).stopAsync();
        ((FakeSubscriber) Mockito.verify(this.subscriber)).awaitTerminated(1L, TimeUnit.MINUTES);
        ((TopicBacklogReader) Mockito.verify(this.backlogReader)).close();
    }
}
