/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryBufferedSubscriber;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.OffsetByteProgress;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.OffsetByteRange;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscriptionPartition;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscriptionPartitionProcessor;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscriptionPartitionProcessorImpl;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.mockito.verification.VerificationMode;

@RunWith(value=JUnit4.class)
public class SubscriptionPartitionProcessorImplTest {
    private static final SubscriptionPartition PARTITION = SubscriptionPartition.of((SubscriptionPath)((SubscriptionPath)UnitTestExamples.example(SubscriptionPath.class)), (Partition)((Partition)UnitTestExamples.example(Partition.class)));
    @Spy
    RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
    @Mock
    DoFn.OutputReceiver<SequencedMessage> receiver;
    @Mock
    Supplier<MemoryBufferedSubscriber> subscriberFactory;
    @Rule
    public Timeout globalTimeout = Timeout.seconds((long)30L);
    @Spy
    FakeSubscriber subscriber;

    private static SequencedMessage messageWithOffset(long offset) {
        return SequencedMessage.newBuilder().setCursor(Cursor.newBuilder().setOffset(offset)).setPublishTime(Timestamps.fromMillis((long)(10000L + offset))).setSizeBytes(1024L).build();
    }

    private OffsetByteRange initialRange() {
        return OffsetByteRange.of((OffsetRange)new OffsetRange(((Offset)UnitTestExamples.example(Offset.class)).value(), Long.MAX_VALUE));
    }

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks((Object)this);
        Mockito.when((Object)this.subscriberFactory.get()).thenReturn((Object)this.subscriber);
        Mockito.when((Object)((OffsetByteRange)this.tracker.currentRestriction())).thenReturn((Object)this.initialRange());
        ((FakeSubscriber)((Object)Mockito.doReturn((Object)true).when((Object)this.subscriber))).isRunning();
        ((FakeSubscriber)((Object)Mockito.doReturn((Object)UnitTestExamples.example(Offset.class)).when((Object)this.subscriber))).fetchOffset();
    }

    private SubscriptionPartitionProcessor newProcessor() {
        return new SubscriptionPartitionProcessorImpl(PARTITION, this.tracker, this.receiver, this.subscriberFactory);
    }

    @Test
    public void lifecycle() {
        SubscriptionPartitionProcessor processor = this.newProcessor();
        Assert.assertEquals((Object)DoFn.ProcessContinuation.resume(), (Object)processor.run());
        InOrder order = Mockito.inOrder((Object[])new Object[]{this.subscriberFactory, this.subscriber});
        ((Supplier)order.verify(this.subscriberFactory)).get();
        ((FakeSubscriber)((Object)order.verify((Object)this.subscriber))).fetchOffset();
        ((FakeSubscriber)((Object)order.verify((Object)this.subscriber))).rebuffer();
    }

    @Test
    public void lifecycleOffsetMismatch() {
        MemoryBufferedSubscriber badSubscriber = (MemoryBufferedSubscriber)Mockito.spy(FakeSubscriber.class);
        ((MemoryBufferedSubscriber)Mockito.doReturn((Object)Offset.of((long)(((Offset)UnitTestExamples.example(Offset.class)).value() + 1L))).when((Object)badSubscriber)).fetchOffset();
        ((MemoryBufferedSubscriber)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("Ignored")}).when((Object)badSubscriber)).awaitTerminated();
        ((Supplier)Mockito.doReturn((Object)badSubscriber, (Object[])new Object[]{this.subscriber}).when(this.subscriberFactory)).get();
        SubscriptionPartitionProcessor processor = this.newProcessor();
        Assert.assertEquals((Object)DoFn.ProcessContinuation.resume(), (Object)processor.run());
        InOrder order = Mockito.inOrder((Object[])new Object[]{this.subscriberFactory, badSubscriber, this.subscriber});
        ((Supplier)order.verify(this.subscriberFactory)).get();
        ((MemoryBufferedSubscriber)order.verify((Object)badSubscriber)).fetchOffset();
        ((MemoryBufferedSubscriber)order.verify((Object)badSubscriber)).stopAsync();
        ((MemoryBufferedSubscriber)order.verify((Object)badSubscriber)).awaitTerminated();
        ((Supplier)order.verify(this.subscriberFactory)).get();
        ((FakeSubscriber)((Object)order.verify((Object)this.subscriber))).fetchOffset();
        ((FakeSubscriber)((Object)order.verify((Object)this.subscriber))).rebuffer();
    }

    @Test
    public void lifecycleRebufferThrows() throws Exception {
        ((FakeSubscriber)((Object)Mockito.doThrow((Throwable[])new Throwable[]{new CheckedApiException((StatusCode.Code)StatusCode.Code.OUT_OF_RANGE).underlying}).when((Object)this.subscriber))).rebuffer();
        Assert.assertThrows(ApiException.class, this::newProcessor);
    }

    @Test
    public void failedClaimCausesStop() {
        SubscriptionPartitionProcessor processor = this.newProcessor();
        Mockito.when((Object)this.tracker.tryClaim((Object)((OffsetByteProgress)ArgumentMatchers.any()))).thenReturn((Object)false);
        ((FakeSubscriber)((Object)Mockito.doReturn(Optional.of(SubscriptionPartitionProcessorImplTest.messageWithOffset(1L))).when((Object)this.subscriber))).peek();
        Assert.assertEquals((Object)DoFn.ProcessContinuation.stop(), (Object)processor.run());
        ((RestrictionTracker)Mockito.verify(this.tracker, (VerificationMode)Mockito.times((int)1))).tryClaim((Object)((OffsetByteProgress)ArgumentMatchers.any()));
        ((FakeSubscriber)((Object)Mockito.verify((Object)((Object)this.subscriber), (VerificationMode)Mockito.times((int)0)))).pop();
        Assert.assertFalse((boolean)processor.lastClaimed().isPresent());
    }

    @Test
    public void successfulClaimsThenNoMoreMessagesFromSubscriber() {
        ((RestrictionTracker)Mockito.doReturn((Object)true).when(this.tracker)).tryClaim((Object)((OffsetByteProgress)ArgumentMatchers.any()));
        SequencedMessage message1 = SubscriptionPartitionProcessorImplTest.messageWithOffset(1L);
        SequencedMessage message3 = SubscriptionPartitionProcessorImplTest.messageWithOffset(3L);
        ((FakeSubscriber)((Object)Mockito.doReturn(Optional.of(message1), (Object[])new Object[]{Optional.of(message3), Optional.empty()}).when((Object)this.subscriber))).peek();
        SubscriptionPartitionProcessor processor = this.newProcessor();
        Assert.assertEquals((Object)DoFn.ProcessContinuation.resume(), (Object)processor.run());
        InOrder order = Mockito.inOrder((Object[])new Object[]{this.tracker, this.receiver});
        ((RestrictionTracker)order.verify(this.tracker)).tryClaim((Object)OffsetByteProgress.of((Offset)Offset.of((long)1L), (long)message1.getSizeBytes()));
        ((DoFn.OutputReceiver)order.verify(this.receiver)).outputWithTimestamp((Object)message1, new Instant(Timestamps.toMillis((Timestamp)message1.getPublishTime())));
        ((RestrictionTracker)order.verify(this.tracker)).tryClaim((Object)OffsetByteProgress.of((Offset)Offset.of((long)3L), (long)message3.getSizeBytes()));
        ((DoFn.OutputReceiver)order.verify(this.receiver)).outputWithTimestamp((Object)message3, new Instant(Timestamps.toMillis((Timestamp)message3.getPublishTime())));
        Assert.assertEquals(processor.lastClaimed().get(), (Object)Offset.of((long)3L));
    }

    static abstract class FakeSubscriber
    extends FakeApiService
    implements MemoryBufferedSubscriber {
        FakeSubscriber() {
        }
    }
}

