/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.sdk.io.gcp.pubsublite.OffsetByteProgress;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriptionPartitionProcessor;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriptionPartitionProcessorImpl;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.mockito.verification.VerificationMode;

@RunWith(value=JUnit4.class)
public class SubscriptionPartitionProcessorImplTest {
    @Spy
    @UnknownKeyFor @NonNull @Initialized RestrictionTracker<@UnknownKeyFor @NonNull @Initialized OffsetRange, @UnknownKeyFor @NonNull @Initialized OffsetByteProgress> tracker;
    @Mock
    // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized DoFn.OutputReceiver<@UnknownKeyFor @NonNull @Initialized SequencedMessage> receiver;
    @Mock
    @UnknownKeyFor @NonNull @Initialized Function<@UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized SequencedMessage>>, @UnknownKeyFor @NonNull @Initialized Subscriber> subscriberFactory;
    @Spy
    @UnknownKeyFor @NonNull @Initialized FakeSubscriber subscriber;
    @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized SequencedMessage>> leakedConsumer;
    @UnknownKeyFor @NonNull @Initialized SubscriptionPartitionProcessor processor;

    private static @UnknownKeyFor @NonNull @Initialized SequencedMessage messageWithOffset(@UnknownKeyFor @NonNull @Initialized long offset) {
        return SequencedMessage.newBuilder().setCursor(Cursor.newBuilder().setOffset(offset)).setPublishTime(Timestamps.fromMillis((long)(10000L + offset))).setSizeBytes(1024L).build();
    }

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks((Object)this);
        Mockito.when((Object)this.subscriberFactory.apply((Consumer)ArgumentMatchers.any())).then(args -> {
            this.leakedConsumer = (Consumer)args.getArgument(0);
            return this.subscriber;
        });
        this.processor = new SubscriptionPartitionProcessorImpl(this.tracker, this.receiver, this.subscriberFactory, SubscriberOptions.DEFAULT_FLOW_CONTROL);
        Assert.assertNotNull(this.leakedConsumer);
    }

    @Test
    public void lifecycle() throws @UnknownKeyFor @NonNull @Initialized Exception {
        Mockito.when((Object)((OffsetRange)this.tracker.currentRestriction())).thenReturn((Object)new OffsetRange(((Offset)UnitTestExamples.example(Offset.class)).value(), Long.MAX_VALUE));
        Mockito.when((Object)this.subscriber.seek((SeekRequest)ArgumentMatchers.any())).thenReturn((Object)ApiFutures.immediateFuture((Object)((Offset)UnitTestExamples.example(Offset.class))));
        this.processor.start();
        ((FakeSubscriber)((Object)Mockito.verify((Object)((Object)this.subscriber)))).startAsync();
        ((FakeSubscriber)((Object)Mockito.verify((Object)((Object)this.subscriber)))).awaitRunning();
        ((FakeSubscriber)((Object)Mockito.verify((Object)((Object)this.subscriber)))).seek(SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(((Offset)UnitTestExamples.example(Offset.class)).value())).build());
        ((FakeSubscriber)((Object)Mockito.verify((Object)((Object)this.subscriber)))).allowFlow(FlowControlRequest.newBuilder().setAllowedBytes(SubscriberOptions.DEFAULT_FLOW_CONTROL.bytesOutstanding()).setAllowedMessages(SubscriberOptions.DEFAULT_FLOW_CONTROL.messagesOutstanding()).build());
        this.processor.close();
        ((FakeSubscriber)((Object)Mockito.verify((Object)((Object)this.subscriber)))).stopAsync();
        ((FakeSubscriber)((Object)Mockito.verify((Object)((Object)this.subscriber)))).awaitTerminated();
    }

    @Test
    public void lifecycleSeekThrows() throws @UnknownKeyFor @NonNull @Initialized Exception {
        Mockito.when((Object)((OffsetRange)this.tracker.currentRestriction())).thenReturn((Object)new OffsetRange(((Offset)UnitTestExamples.example(Offset.class)).value(), Long.MAX_VALUE));
        Mockito.when((Object)this.subscriber.seek((SeekRequest)ArgumentMatchers.any())).thenReturn((Object)ApiFutures.immediateFailedFuture((Throwable)new CheckedApiException(StatusCode.Code.OUT_OF_RANGE)));
        ((FakeSubscriber)((Object)Mockito.doThrow((Throwable[])new Throwable[]{new CheckedApiException(StatusCode.Code.OUT_OF_RANGE)}).when((Object)this.subscriber))).allowFlow((FlowControlRequest)ArgumentMatchers.any());
        Assert.assertThrows(CheckedApiException.class, () -> this.processor.start());
    }

    @Test
    public void lifecycleFlowControlThrows() {
        Mockito.when((Object)((OffsetRange)this.tracker.currentRestriction())).thenReturn((Object)new OffsetRange(((Offset)UnitTestExamples.example(Offset.class)).value(), Long.MAX_VALUE));
        Mockito.when((Object)this.subscriber.seek((SeekRequest)ArgumentMatchers.any())).thenReturn((Object)ApiFutures.immediateFailedFuture((Throwable)new CheckedApiException(StatusCode.Code.OUT_OF_RANGE)));
        Assert.assertThrows(CheckedApiException.class, () -> this.processor.start());
    }

    @Test
    public void lifecycleSubscriberAwaitThrows() throws @UnknownKeyFor @NonNull @Initialized Exception {
        Mockito.when((Object)((OffsetRange)this.tracker.currentRestriction())).thenReturn((Object)new OffsetRange(((Offset)UnitTestExamples.example(Offset.class)).value(), Long.MAX_VALUE));
        Mockito.when((Object)this.subscriber.seek((SeekRequest)ArgumentMatchers.any())).thenReturn((Object)ApiFutures.immediateFuture((Object)((Offset)UnitTestExamples.example(Offset.class))));
        this.processor.start();
        ((FakeSubscriber)((Object)Mockito.doThrow((Throwable[])new Throwable[]{new CheckedApiException((StatusCode.Code)StatusCode.Code.INTERNAL).underlying}).when((Object)this.subscriber))).awaitTerminated();
        Assert.assertThrows(ApiException.class, () -> this.processor.close());
        ((FakeSubscriber)((Object)Mockito.verify((Object)((Object)this.subscriber)))).stopAsync();
        ((FakeSubscriber)((Object)Mockito.verify((Object)((Object)this.subscriber)))).awaitTerminated();
    }

    @Test
    public void subscriberFailureFails() throws @UnknownKeyFor @NonNull @Initialized Exception {
        Mockito.when((Object)((OffsetRange)this.tracker.currentRestriction())).thenReturn((Object)new OffsetRange(((Offset)UnitTestExamples.example(Offset.class)).value(), Long.MAX_VALUE));
        Mockito.when((Object)this.subscriber.seek((SeekRequest)ArgumentMatchers.any())).thenReturn((Object)ApiFutures.immediateFuture((Object)((Offset)UnitTestExamples.example(Offset.class))));
        this.processor.start();
        this.subscriber.fail((Throwable)new CheckedApiException(StatusCode.Code.OUT_OF_RANGE));
        ApiException e = (ApiException)Assert.assertThrows(ApiException.class, () -> this.processor.waitForCompletion(Duration.ZERO));
        Assert.assertEquals((Object)StatusCode.Code.OUT_OF_RANGE, (Object)e.getStatusCode().getCode());
    }

    @Test
    public void allowFlowFailureFails() throws @UnknownKeyFor @NonNull @Initialized Exception {
        Mockito.when((Object)((OffsetRange)this.tracker.currentRestriction())).thenReturn((Object)new OffsetRange(((Offset)UnitTestExamples.example(Offset.class)).value(), Long.MAX_VALUE));
        Mockito.when((Object)this.subscriber.seek((SeekRequest)ArgumentMatchers.any())).thenReturn((Object)ApiFutures.immediateFuture((Object)((Offset)UnitTestExamples.example(Offset.class))));
        this.processor.start();
        Mockito.when((Object)this.tracker.tryClaim((Object)((OffsetByteProgress)ArgumentMatchers.any()))).thenReturn((Object)true);
        ((FakeSubscriber)((Object)Mockito.doThrow((Throwable[])new Throwable[]{new CheckedApiException(StatusCode.Code.OUT_OF_RANGE)}).when((Object)this.subscriber))).allowFlow((FlowControlRequest)ArgumentMatchers.any());
        this.leakedConsumer.accept((List<SequencedMessage>)ImmutableList.of((Object)SubscriptionPartitionProcessorImplTest.messageWithOffset(1L)));
        ApiException e = (ApiException)Assert.assertThrows(ApiException.class, () -> this.processor.waitForCompletion(Duration.ZERO));
        Assert.assertEquals((Object)StatusCode.Code.OUT_OF_RANGE, (Object)e.getStatusCode().getCode());
    }

    @Test
    public void timeoutReturnsResume() {
        Assert.assertEquals((Object)DoFn.ProcessContinuation.resume(), (Object)this.processor.waitForCompletion(Duration.millis((long)10L)));
        Assert.assertFalse((boolean)this.processor.lastClaimed().isPresent());
    }

    @Test
    public void failedClaimCausesStop() {
        Mockito.when((Object)this.tracker.tryClaim((Object)((OffsetByteProgress)ArgumentMatchers.any()))).thenReturn((Object)false);
        this.leakedConsumer.accept((List<SequencedMessage>)ImmutableList.of((Object)SubscriptionPartitionProcessorImplTest.messageWithOffset(1L)));
        ((RestrictionTracker)Mockito.verify(this.tracker, (VerificationMode)Mockito.times((int)1))).tryClaim((Object)((OffsetByteProgress)ArgumentMatchers.any()));
        Assert.assertEquals((Object)DoFn.ProcessContinuation.stop(), (Object)this.processor.waitForCompletion(Duration.millis((long)10L)));
        Assert.assertFalse((boolean)this.processor.lastClaimed().isPresent());
        this.leakedConsumer.accept((List<SequencedMessage>)ImmutableList.of((Object)SubscriptionPartitionProcessorImplTest.messageWithOffset(2L)));
        ((RestrictionTracker)Mockito.verify(this.tracker, (VerificationMode)Mockito.times((int)1))).tryClaim((Object)((OffsetByteProgress)ArgumentMatchers.any()));
    }

    @Test
    public void successfulClaimThenTimeout() throws @UnknownKeyFor @NonNull @Initialized Exception {
        Mockito.when((Object)this.tracker.tryClaim((Object)((OffsetByteProgress)ArgumentMatchers.any()))).thenReturn((Object)true);
        SequencedMessage message1 = SubscriptionPartitionProcessorImplTest.messageWithOffset(1L);
        SequencedMessage message3 = SubscriptionPartitionProcessorImplTest.messageWithOffset(3L);
        this.leakedConsumer.accept((List<SequencedMessage>)ImmutableList.of((Object)message1, (Object)message3));
        InOrder order = Mockito.inOrder((Object[])new Object[]{this.tracker, this.receiver, this.subscriber});
        ((RestrictionTracker)order.verify(this.tracker)).tryClaim((Object)OffsetByteProgress.of((Offset)Offset.of((long)3L), (long)(message1.getSizeBytes() + message3.getSizeBytes())));
        ((DoFn.OutputReceiver)order.verify(this.receiver)).outputWithTimestamp((Object)message1, new Instant(Timestamps.toMillis((Timestamp)message1.getPublishTime())));
        ((DoFn.OutputReceiver)order.verify(this.receiver)).outputWithTimestamp((Object)message3, new Instant(Timestamps.toMillis((Timestamp)message3.getPublishTime())));
        ((FakeSubscriber)((Object)order.verify((Object)this.subscriber))).allowFlow(FlowControlRequest.newBuilder().setAllowedMessages(2L).setAllowedBytes(message1.getSizeBytes() + message3.getSizeBytes()).build());
        Assert.assertEquals((Object)DoFn.ProcessContinuation.resume(), (Object)this.processor.waitForCompletion(Duration.millis((long)10L)));
        Assert.assertEquals(this.processor.lastClaimed().get(), (Object)Offset.of((long)3L));
    }

    static abstract class FakeSubscriber
    extends FakeApiService
    implements Subscriber {
        FakeSubscriber() {
        }
    }
}

