package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.util.Timestamps;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.class */
public class SubscriptionPartitionProcessorImplTest {

    @Spy
    RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;

    @Mock
    DoFn.OutputReceiver<SequencedMessage> receiver;

    @Mock
    Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory;

    @Spy
    FakeSubscriber subscriber;
    Consumer<List<SequencedMessage>> leakedConsumer;
    SubscriptionPartitionProcessor processor;

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest$FakeSubscriber.class */
    static abstract class FakeSubscriber extends FakeApiService implements Subscriber {
        FakeSubscriber() {
        }
    }

    private static SequencedMessage messageWithOffset(long j) {
        return SequencedMessage.newBuilder().setCursor(Cursor.newBuilder().setOffset(j)).setPublishTime(Timestamps.fromMillis(10000 + j)).setSizeBytes(1024L).build();
    }

    private OffsetByteRange initialRange() {
        return OffsetByteRange.of(new OffsetRange(((Offset) UnitTestExamples.example(Offset.class)).value(), Long.MAX_VALUE));
    }

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
        Mockito.when(this.subscriberFactory.apply((Consumer) ArgumentMatchers.any())).then(invocationOnMock -> {
            this.leakedConsumer = (Consumer) invocationOnMock.getArgument(0);
            return this.subscriber;
        });
        this.processor = new SubscriptionPartitionProcessorImpl(this.tracker, this.receiver, this.subscriberFactory, SubscriberOptions.DEFAULT_FLOW_CONTROL);
        Assert.assertNotNull(this.leakedConsumer);
    }

    @Test
    public void lifecycle() throws Exception {
        Mockito.when((OffsetByteRange) this.tracker.currentRestriction()).thenReturn(initialRange());
        this.processor.start();
        ((FakeSubscriber) Mockito.verify(this.subscriber)).startAsync();
        ((FakeSubscriber) Mockito.verify(this.subscriber)).awaitRunning();
        ((FakeSubscriber) Mockito.verify(this.subscriber)).allowFlow(FlowControlRequest.newBuilder().setAllowedBytes(SubscriberOptions.DEFAULT_FLOW_CONTROL.bytesOutstanding()).setAllowedMessages(SubscriberOptions.DEFAULT_FLOW_CONTROL.messagesOutstanding()).build());
        this.processor.close();
        ((FakeSubscriber) Mockito.verify(this.subscriber)).stopAsync();
        ((FakeSubscriber) Mockito.verify(this.subscriber)).awaitTerminated();
    }

    @Test
    public void lifecycleFlowControlThrows() throws Exception {
        Mockito.when((OffsetByteRange) this.tracker.currentRestriction()).thenReturn(initialRange());
        ((FakeSubscriber) Mockito.doThrow(new Throwable[]{new CheckedApiException(StatusCode.Code.OUT_OF_RANGE)}).when(this.subscriber)).allowFlow((FlowControlRequest) ArgumentMatchers.any());
        Assert.assertThrows(CheckedApiException.class, () -> {
            this.processor.start();
        });
    }

    @Test
    public void lifecycleSubscriberAwaitThrows() throws Exception {
        Mockito.when((OffsetByteRange) this.tracker.currentRestriction()).thenReturn(initialRange());
        this.processor.start();
        ((FakeSubscriber) Mockito.doThrow(new Throwable[]{new CheckedApiException(StatusCode.Code.INTERNAL).underlying}).when(this.subscriber)).awaitTerminated();
        Assert.assertThrows(ApiException.class, () -> {
            this.processor.close();
        });
        ((FakeSubscriber) Mockito.verify(this.subscriber)).stopAsync();
        ((FakeSubscriber) Mockito.verify(this.subscriber)).awaitTerminated();
    }

    @Test
    public void subscriberFailureFails() throws Exception {
        Mockito.when((OffsetByteRange) this.tracker.currentRestriction()).thenReturn(initialRange());
        this.processor.start();
        this.subscriber.fail(new CheckedApiException(StatusCode.Code.OUT_OF_RANGE));
        Assert.assertEquals(StatusCode.Code.OUT_OF_RANGE, Assert.assertThrows(ApiException.class, () -> {
            this.processor.waitForCompletion(Duration.standardSeconds(1L));
        }).getStatusCode().getCode());
    }

    @Test
    public void allowFlowFailureFails() throws Exception {
        Mockito.when((OffsetByteRange) this.tracker.currentRestriction()).thenReturn(initialRange());
        this.processor.start();
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim((OffsetByteProgress) ArgumentMatchers.any()))).thenReturn(true);
        ((FakeSubscriber) Mockito.doThrow(new Throwable[]{new CheckedApiException(StatusCode.Code.OUT_OF_RANGE)}).when(this.subscriber)).allowFlow((FlowControlRequest) ArgumentMatchers.any());
        this.leakedConsumer.accept(ImmutableList.of(messageWithOffset(1L)));
        Assert.assertEquals(StatusCode.Code.OUT_OF_RANGE, Assert.assertThrows(ApiException.class, () -> {
            this.processor.waitForCompletion(Duration.ZERO);
        }).getStatusCode().getCode());
    }

    @Test
    public void timeoutReturnsResume() {
        Assert.assertEquals(DoFn.ProcessContinuation.resume(), this.processor.waitForCompletion(Duration.millis(10L)));
        Assert.assertFalse(this.processor.lastClaimed().isPresent());
    }

    @Test
    public void failedClaimCausesStop() {
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim((OffsetByteProgress) ArgumentMatchers.any()))).thenReturn(false);
        this.leakedConsumer.accept(ImmutableList.of(messageWithOffset(1L)));
        ((RestrictionTracker) Mockito.verify(this.tracker, Mockito.times(1))).tryClaim((OffsetByteProgress) ArgumentMatchers.any());
        Assert.assertEquals(DoFn.ProcessContinuation.stop(), this.processor.waitForCompletion(Duration.millis(10L)));
        Assert.assertFalse(this.processor.lastClaimed().isPresent());
        this.leakedConsumer.accept(ImmutableList.of(messageWithOffset(2L)));
        ((RestrictionTracker) Mockito.verify(this.tracker, Mockito.times(1))).tryClaim((OffsetByteProgress) ArgumentMatchers.any());
    }

    @Test
    public void successfulClaimThenTimeout() throws Exception {
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim((OffsetByteProgress) ArgumentMatchers.any()))).thenReturn(true);
        SequencedMessage messageWithOffset = messageWithOffset(1L);
        SequencedMessage messageWithOffset2 = messageWithOffset(3L);
        this.leakedConsumer.accept(ImmutableList.of(messageWithOffset, messageWithOffset2));
        InOrder inOrder = Mockito.inOrder(new Object[]{this.tracker, this.receiver, this.subscriber});
        ((RestrictionTracker) inOrder.verify(this.tracker)).tryClaim(OffsetByteProgress.of(Offset.of(3L), messageWithOffset.getSizeBytes() + messageWithOffset2.getSizeBytes()));
        ((DoFn.OutputReceiver) inOrder.verify(this.receiver)).outputWithTimestamp(messageWithOffset, new Instant(Timestamps.toMillis(messageWithOffset.getPublishTime())));
        ((DoFn.OutputReceiver) inOrder.verify(this.receiver)).outputWithTimestamp(messageWithOffset2, new Instant(Timestamps.toMillis(messageWithOffset2.getPublishTime())));
        ((FakeSubscriber) inOrder.verify(this.subscriber)).allowFlow(FlowControlRequest.newBuilder().setAllowedMessages(2L).setAllowedBytes(messageWithOffset.getSizeBytes() + messageWithOffset2.getSizeBytes()).build());
        Assert.assertEquals(DoFn.ProcessContinuation.resume(), this.processor.waitForCompletion(Duration.millis(10L)));
        Assert.assertEquals(this.processor.lastClaimed().get(), Offset.of(3L));
    }
}
