package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.util.Timestamps;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImplTest.class */
public class SubscriptionPartitionProcessorImplTest {
    private static final SubscriptionPartition PARTITION = SubscriptionPartition.of((SubscriptionPath) UnitTestExamples.example(SubscriptionPath.class), (Partition) UnitTestExamples.example(Partition.class));

    @Spy
    RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;

    @Mock
    DoFn.OutputReceiver<SequencedMessage> receiver;

    @Mock
    Supplier<MemoryBufferedSubscriber> subscriberFactory;

    @Rule
    public Timeout globalTimeout = Timeout.seconds(30);

    @Spy
    FakeSubscriber subscriber;

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImplTest$FakeSubscriber.class */
    static abstract class FakeSubscriber extends FakeApiService implements MemoryBufferedSubscriber {
        FakeSubscriber() {
        }
    }

    private static SequencedMessage messageWithOffset(long j) {
        return SequencedMessage.newBuilder().setCursor(Cursor.newBuilder().setOffset(j)).setPublishTime(Timestamps.fromMillis(10000 + j)).setSizeBytes(1024L).build();
    }

    private OffsetByteRange initialRange() {
        return OffsetByteRange.of(new OffsetRange(((Offset) UnitTestExamples.example(Offset.class)).value(), Long.MAX_VALUE));
    }

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
        Mockito.when(this.subscriberFactory.get()).thenReturn(this.subscriber);
        Mockito.when((OffsetByteRange) this.tracker.currentRestriction()).thenReturn(initialRange());
        ((FakeSubscriber) Mockito.doReturn(true).when(this.subscriber)).isRunning();
        ((FakeSubscriber) Mockito.doReturn(UnitTestExamples.example(Offset.class)).when(this.subscriber)).fetchOffset();
    }

    private SubscriptionPartitionProcessor newProcessor() {
        return new SubscriptionPartitionProcessorImpl(PARTITION, this.tracker, this.receiver, this.subscriberFactory);
    }

    @Test
    public void lifecycle() {
        Assert.assertEquals(DoFn.ProcessContinuation.resume(), newProcessor().run());
        InOrder inOrder = Mockito.inOrder(new Object[]{this.subscriberFactory, this.subscriber});
        ((Supplier) inOrder.verify(this.subscriberFactory)).get();
        ((FakeSubscriber) inOrder.verify(this.subscriber)).fetchOffset();
        ((FakeSubscriber) inOrder.verify(this.subscriber)).rebuffer();
    }

    @Test
    public void lifecycleOffsetMismatch() {
        MemoryBufferedSubscriber memoryBufferedSubscriber = (MemoryBufferedSubscriber) Mockito.spy(FakeSubscriber.class);
        ((MemoryBufferedSubscriber) Mockito.doReturn(Offset.of(((Offset) UnitTestExamples.example(Offset.class)).value() + 1)).when(memoryBufferedSubscriber)).fetchOffset();
        ((MemoryBufferedSubscriber) Mockito.doThrow(new Throwable[]{new RuntimeException("Ignored")}).when(memoryBufferedSubscriber)).awaitTerminated();
        ((Supplier) Mockito.doReturn(memoryBufferedSubscriber, new Object[]{this.subscriber}).when(this.subscriberFactory)).get();
        Assert.assertEquals(DoFn.ProcessContinuation.resume(), newProcessor().run());
        InOrder inOrder = Mockito.inOrder(new Object[]{this.subscriberFactory, memoryBufferedSubscriber, this.subscriber});
        ((Supplier) inOrder.verify(this.subscriberFactory)).get();
        ((MemoryBufferedSubscriber) inOrder.verify(memoryBufferedSubscriber)).fetchOffset();
        ((MemoryBufferedSubscriber) inOrder.verify(memoryBufferedSubscriber)).stopAsync();
        ((MemoryBufferedSubscriber) inOrder.verify(memoryBufferedSubscriber)).awaitTerminated();
        ((Supplier) inOrder.verify(this.subscriberFactory)).get();
        ((FakeSubscriber) inOrder.verify(this.subscriber)).fetchOffset();
        ((FakeSubscriber) inOrder.verify(this.subscriber)).rebuffer();
    }

    @Test
    public void lifecycleRebufferThrows() throws Exception {
        ((FakeSubscriber) Mockito.doThrow(new Throwable[]{new CheckedApiException(StatusCode.Code.OUT_OF_RANGE).underlying}).when(this.subscriber)).rebuffer();
        Assert.assertThrows(ApiException.class, this::newProcessor);
    }

    @Test
    public void failedClaimCausesStop() {
        SubscriptionPartitionProcessor newProcessor = newProcessor();
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim((OffsetByteProgress) ArgumentMatchers.any()))).thenReturn(false);
        ((FakeSubscriber) Mockito.doReturn(Optional.of(messageWithOffset(1L))).when(this.subscriber)).peek();
        Assert.assertEquals(DoFn.ProcessContinuation.stop(), newProcessor.run());
        ((RestrictionTracker) Mockito.verify(this.tracker, Mockito.times(1))).tryClaim((OffsetByteProgress) ArgumentMatchers.any());
        ((FakeSubscriber) Mockito.verify(this.subscriber, Mockito.times(0))).pop();
        Assert.assertFalse(newProcessor.lastClaimed().isPresent());
    }

    @Test
    public void successfulClaimsThenNoMoreMessagesFromSubscriber() {
        ((RestrictionTracker) Mockito.doReturn(true).when(this.tracker)).tryClaim((OffsetByteProgress) ArgumentMatchers.any());
        SequencedMessage messageWithOffset = messageWithOffset(1L);
        SequencedMessage messageWithOffset2 = messageWithOffset(3L);
        ((FakeSubscriber) Mockito.doReturn(Optional.of(messageWithOffset), new Object[]{Optional.of(messageWithOffset2), Optional.empty()}).when(this.subscriber)).peek();
        SubscriptionPartitionProcessor newProcessor = newProcessor();
        Assert.assertEquals(DoFn.ProcessContinuation.resume(), newProcessor.run());
        InOrder inOrder = Mockito.inOrder(new Object[]{this.tracker, this.receiver});
        ((RestrictionTracker) inOrder.verify(this.tracker)).tryClaim(OffsetByteProgress.of(Offset.of(1L), messageWithOffset.getSizeBytes()));
        ((DoFn.OutputReceiver) inOrder.verify(this.receiver)).outputWithTimestamp(messageWithOffset, new Instant(Timestamps.toMillis(messageWithOffset.getPublishTime())));
        ((RestrictionTracker) inOrder.verify(this.tracker)).tryClaim(OffsetByteProgress.of(Offset.of(3L), messageWithOffset2.getSizeBytes()));
        ((DoFn.OutputReceiver) inOrder.verify(this.receiver)).outputWithTimestamp(messageWithOffset2, new Instant(Timestamps.toMillis(messageWithOffset2.getPublishTime())));
        Assert.assertEquals(newProcessor.lastClaimed().get(), Offset.of(3L));
    }
}
