package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.FakeApiService;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import io.grpc.StatusException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Random;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteUnboundedReader;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReaderTest.class */
public class PubsubLiteUnboundedReaderTest {

    @Mock
    private PullSubscriber subscriber5;

    @Mock
    private PullSubscriber subscriber8;

    @Spy
    private CommitterFakeService committer5;

    @Spy
    private CommitterFakeService committer8;
    private final UnboundedSource<SequencedMessage, ?> source = (UnboundedSource) Mockito.mock(UnboundedSource.class);
    private final PubsubLiteUnboundedReader reader;

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReaderTest$CommitterFakeService.class */
    static abstract class CommitterFakeService extends FakeApiService implements Committer {
        CommitterFakeService() {
        }
    }

    private static SequencedMessage exampleMessage(Offset offset, Timestamp timestamp) {
        return SequencedMessage.newBuilder().setPublishTime(timestamp).setCursor(Cursor.newBuilder().setOffset(offset.value())).setSizeBytes(100L).build();
    }

    private static Timestamp randomMilliAllignedTimestamp() {
        return Timestamps.fromMillis(new Random().nextInt(Integer.MAX_VALUE));
    }

    private static Instant toInstant(Timestamp timestamp) {
        return new Instant(Timestamps.toMillis(timestamp));
    }

    public PubsubLiteUnboundedReaderTest() throws StatusException {
        MockitoAnnotations.initMocks(this);
        PubsubLiteUnboundedReader.SubscriberState subscriberState = new PubsubLiteUnboundedReader.SubscriberState();
        subscriberState.subscriber = this.subscriber5;
        subscriberState.committer = this.committer5;
        PubsubLiteUnboundedReader.SubscriberState subscriberState2 = new PubsubLiteUnboundedReader.SubscriberState();
        subscriberState2.subscriber = this.subscriber8;
        subscriberState2.committer = this.committer8;
        this.reader = new PubsubLiteUnboundedReader(this.source, ImmutableMap.of(Partition.of(5L), subscriberState, Partition.of(8L), subscriberState2));
    }

    @Test
    public void sourceReturnsSource() {
        MatcherAssert.assertThat(this.reader.getCurrentSource(), Matchers.sameInstance(this.source));
    }

    @Test
    public void startPullsFromAllSubscribers() throws Exception {
        Mockito.when(this.subscriber5.pull()).thenReturn(ImmutableList.of());
        Mockito.when(this.subscriber8.pull()).thenReturn(ImmutableList.of());
        Assert.assertFalse(this.reader.start());
        ((PullSubscriber) Mockito.verify(this.subscriber5)).pull();
        ((PullSubscriber) Mockito.verify(this.subscriber8)).pull();
        MatcherAssert.assertThat(this.reader.getWatermark(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
        Mockito.verifyNoMoreInteractions(new Object[]{this.subscriber5, this.subscriber8});
    }

    @Test
    public void startReturnsTrueIfMessagesExist() throws Exception {
        Timestamp randomMilliAllignedTimestamp = randomMilliAllignedTimestamp();
        SequencedMessage exampleMessage = exampleMessage(Offset.of(10L), randomMilliAllignedTimestamp);
        Mockito.when(this.subscriber5.pull()).thenReturn(ImmutableList.of(exampleMessage));
        Mockito.when(this.subscriber8.pull()).thenReturn(ImmutableList.of());
        Assert.assertTrue(this.reader.start());
        ((PullSubscriber) Mockito.verify(this.subscriber5)).pull();
        ((PullSubscriber) Mockito.verify(this.subscriber8)).pull();
        MatcherAssert.assertThat(this.reader.getCurrent(), Matchers.equalTo(exampleMessage));
        MatcherAssert.assertThat(this.reader.getWatermark(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
        MatcherAssert.assertThat(this.reader.getCurrentTimestamp(), Matchers.equalTo(toInstant(randomMilliAllignedTimestamp)));
        Mockito.verifyNoMoreInteractions(new Object[]{this.subscriber5, this.subscriber8});
    }

    @Test
    public void advanceSetsWatermarkAfterAllSubscribersPopulated() throws Exception {
        Timestamp randomMilliAllignedTimestamp = randomMilliAllignedTimestamp();
        Timestamp randomMilliAllignedTimestamp2 = randomMilliAllignedTimestamp();
        SequencedMessage exampleMessage = exampleMessage(Offset.of(10L), randomMilliAllignedTimestamp);
        SequencedMessage exampleMessage2 = exampleMessage(Offset.of(888L), randomMilliAllignedTimestamp2);
        Mockito.when(this.subscriber5.pull()).thenReturn(ImmutableList.of(exampleMessage));
        Mockito.when(this.subscriber8.pull()).thenReturn(ImmutableList.of(exampleMessage2));
        Assert.assertTrue(this.reader.start());
        ((PullSubscriber) Mockito.verify(this.subscriber5)).pull();
        ((PullSubscriber) Mockito.verify(this.subscriber8)).pull();
        Mockito.verifyNoMoreInteractions(new Object[]{this.subscriber5, this.subscriber8});
        Mockito.reset(new PullSubscriber[]{this.subscriber5, this.subscriber8});
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.reader.getCurrent());
        MatcherAssert.assertThat(this.reader.getWatermark(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
        MatcherAssert.assertThat(this.reader.getCurrentTimestamp(), Matchers.equalTo(toInstant(((SequencedMessage) arrayList.get(0)).getPublishTime())));
        Assert.assertTrue(this.reader.advance());
        arrayList.add(this.reader.getCurrent());
        MatcherAssert.assertThat(this.reader.getWatermark(), Matchers.equalTo((Instant) Collections.min(Arrays.asList(toInstant(randomMilliAllignedTimestamp), toInstant(randomMilliAllignedTimestamp2)))));
        MatcherAssert.assertThat(this.reader.getCurrentTimestamp(), Matchers.equalTo(toInstant(((SequencedMessage) arrayList.get(1)).getPublishTime())));
        Mockito.when(this.subscriber5.pull()).thenReturn(ImmutableList.of());
        Mockito.when(this.subscriber8.pull()).thenReturn(ImmutableList.of());
        Assert.assertFalse(this.reader.advance());
        ((PullSubscriber) Mockito.verify(this.subscriber5)).pull();
        ((PullSubscriber) Mockito.verify(this.subscriber8)).pull();
        Mockito.verifyNoMoreInteractions(new Object[]{this.subscriber5, this.subscriber8});
    }

    @Test
    public void multipleMessagesInPullReadsAllBeforeNextPull() throws Exception {
        Mockito.when(this.subscriber5.pull()).thenReturn(ImmutableList.of(exampleMessage(Offset.of(10L), randomMilliAllignedTimestamp()), exampleMessage(Offset.of(888L), randomMilliAllignedTimestamp()), exampleMessage(Offset.of(999L), randomMilliAllignedTimestamp()))).thenReturn(ImmutableList.of());
        Mockito.when(this.subscriber8.pull()).thenReturn(ImmutableList.of()).thenReturn(ImmutableList.of());
        Assert.assertTrue(this.reader.start());
        Assert.assertTrue(this.reader.advance());
        Assert.assertTrue(this.reader.advance());
        Assert.assertFalse(this.reader.advance());
        ((PullSubscriber) Mockito.verify(this.subscriber5, Mockito.times(2))).pull();
        ((PullSubscriber) Mockito.verify(this.subscriber8, Mockito.times(2))).pull();
        Mockito.verifyNoMoreInteractions(new Object[]{this.subscriber5, this.subscriber8});
    }

    @Test
    public void messagesOnSubsequentPullsProcessed() throws Exception {
        SequencedMessage exampleMessage = exampleMessage(Offset.of(10L), randomMilliAllignedTimestamp());
        SequencedMessage exampleMessage2 = exampleMessage(Offset.of(888L), randomMilliAllignedTimestamp());
        SequencedMessage exampleMessage3 = exampleMessage(Offset.of(999L), randomMilliAllignedTimestamp());
        Mockito.when(this.subscriber5.pull()).thenReturn(ImmutableList.of(exampleMessage)).thenReturn(ImmutableList.of(exampleMessage2)).thenReturn(ImmutableList.of());
        Mockito.when(this.subscriber8.pull()).thenReturn(ImmutableList.of()).thenReturn(ImmutableList.of(exampleMessage3)).thenReturn(ImmutableList.of());
        Assert.assertTrue(this.reader.start());
        Assert.assertTrue(this.reader.advance());
        Assert.assertTrue(this.reader.advance());
        Assert.assertFalse(this.reader.advance());
        ((PullSubscriber) Mockito.verify(this.subscriber5, Mockito.times(3))).pull();
        ((PullSubscriber) Mockito.verify(this.subscriber8, Mockito.times(3))).pull();
        Mockito.verifyNoMoreInteractions(new Object[]{this.subscriber5, this.subscriber8});
    }

    @Test
    public void checkpointMarkFinalizeCommits() throws Exception {
        Mockito.when(this.subscriber5.pull()).thenReturn(ImmutableList.of(exampleMessage(Offset.of(10L), randomMilliAllignedTimestamp())));
        Mockito.when(this.subscriber8.pull()).thenReturn(ImmutableList.of());
        Assert.assertTrue(this.reader.start());
        ((PullSubscriber) Mockito.verify(this.subscriber5)).pull();
        ((PullSubscriber) Mockito.verify(this.subscriber8)).pull();
        Mockito.verifyNoMoreInteractions(new Object[]{this.subscriber5, this.subscriber8});
        UnboundedSource.CheckpointMark checkpointMark = this.reader.getCheckpointMark();
        Mockito.when(this.committer5.commitOffset(Offset.of(10L))).thenReturn(ApiFutures.immediateFuture((Object) null));
        checkpointMark.finalizeCheckpoint();
        ((CommitterFakeService) Mockito.verify(this.committer5)).commitOffset(Offset.of(10L));
    }
}
