/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.OffsetByteProgress;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.OffsetByteRange;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscriptionPartitionProcessor;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscriptionPartitionProcessorImpl;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.mockito.verification.VerificationMode;

@RunWith(value=JUnit4.class)
public class SubscriptionPartitionProcessorImplTest {
    @Spy
    RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
    @Mock
    DoFn.OutputReceiver<SequencedMessage> receiver;
    @Mock
    Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory;
    @Rule
    public Timeout globalTimeout = Timeout.seconds((long)30L);
    @Spy
    FakeSubscriber subscriber;
    Consumer<List<SequencedMessage>> leakedConsumer;
    SubscriptionPartitionProcessor processor;

    private static SequencedMessage messageWithOffset(long offset) {
        return SequencedMessage.newBuilder().setCursor(Cursor.newBuilder().setOffset(offset)).setPublishTime(Timestamps.fromMillis((long)(10000L + offset))).setSizeBytes(1024L).build();
    }

    private OffsetByteRange initialRange() {
        return OffsetByteRange.of((OffsetRange)new OffsetRange(((Offset)UnitTestExamples.example(Offset.class)).value(), Long.MAX_VALUE));
    }

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks((Object)this);
        Mockito.when((Object)this.subscriberFactory.apply((Consumer)ArgumentMatchers.any())).then(args -> {
            this.leakedConsumer = (Consumer)args.getArgument(0);
            return this.subscriber;
        });
        this.processor = new SubscriptionPartitionProcessorImpl(this.tracker, this.receiver, this.subscriberFactory, SubscriberOptions.DEFAULT_FLOW_CONTROL);
        Assert.assertNotNull(this.leakedConsumer);
    }

    @Test
    public void lifecycle() throws Exception {
        Mockito.when((Object)((OffsetByteRange)this.tracker.currentRestriction())).thenReturn((Object)this.initialRange());
        Assert.assertEquals((Object)DoFn.ProcessContinuation.resume(), (Object)this.processor.runFor(Duration.millis((long)10L)));
        InOrder order = Mockito.inOrder((Object[])new Object[]{this.subscriber});
        ((FakeSubscriber)((Object)order.verify((Object)this.subscriber))).startAsync();
        ((FakeSubscriber)((Object)order.verify((Object)this.subscriber))).awaitRunning();
        ((FakeSubscriber)((Object)order.verify((Object)this.subscriber))).allowFlow(FlowControlRequest.newBuilder().setAllowedBytes(SubscriberOptions.DEFAULT_FLOW_CONTROL.bytesOutstanding()).setAllowedMessages(SubscriberOptions.DEFAULT_FLOW_CONTROL.messagesOutstanding()).build());
        ((FakeSubscriber)((Object)order.verify((Object)this.subscriber))).stopAsync();
        ((FakeSubscriber)((Object)order.verify((Object)this.subscriber))).awaitTerminated();
    }

    @Test
    public void lifecycleFlowControlThrows() throws Exception {
        Mockito.when((Object)((OffsetByteRange)this.tracker.currentRestriction())).thenReturn((Object)this.initialRange());
        ((FakeSubscriber)((Object)Mockito.doThrow((Throwable[])new Throwable[]{new CheckedApiException(StatusCode.Code.OUT_OF_RANGE)}).when((Object)this.subscriber))).allowFlow((FlowControlRequest)ArgumentMatchers.any());
        Assert.assertThrows(ApiException.class, () -> this.processor.runFor(Duration.ZERO));
    }

    @Test
    public void subscriberFailureFails() throws Exception {
        Mockito.when((Object)((OffsetByteRange)this.tracker.currentRestriction())).thenReturn((Object)this.initialRange());
        ((FakeSubscriber)((Object)Mockito.doAnswer(args -> {
            this.subscriber.fail((Throwable)new CheckedApiException(StatusCode.Code.OUT_OF_RANGE));
            return null;
        }).when((Object)this.subscriber))).awaitRunning();
        ApiException e = (ApiException)Assert.assertThrows(ApiException.class, () -> this.processor.runFor(Duration.standardMinutes((long)2L)));
        Assert.assertEquals((Object)StatusCode.Code.OUT_OF_RANGE, (Object)e.getStatusCode().getCode());
    }

    @Test
    public void allowFlowFailureFails() throws Exception {
        Mockito.when((Object)((OffsetByteRange)this.tracker.currentRestriction())).thenReturn((Object)this.initialRange());
        Mockito.when((Object)this.tracker.tryClaim((Object)((OffsetByteProgress)ArgumentMatchers.any()))).thenReturn((Object)true);
        ((FakeSubscriber)((Object)Mockito.doThrow((Throwable[])new Throwable[]{new CheckedApiException(StatusCode.Code.OUT_OF_RANGE)}).when((Object)this.subscriber))).allowFlow((FlowControlRequest)ArgumentMatchers.any());
        SystemExecutors.getFuturesExecutor().execute(() -> this.leakedConsumer.accept((List<SequencedMessage>)ImmutableList.of((Object)SubscriptionPartitionProcessorImplTest.messageWithOffset(1L))));
        ApiException e = (ApiException)Assert.assertThrows(ApiException.class, () -> this.processor.runFor(Duration.standardHours((long)10L)));
        Assert.assertEquals((Object)StatusCode.Code.OUT_OF_RANGE, (Object)e.getStatusCode().getCode());
    }

    @Test
    public void timeoutReturnsResume() {
        Assert.assertEquals((Object)DoFn.ProcessContinuation.resume(), (Object)this.processor.runFor(Duration.millis((long)10L)));
        Assert.assertFalse((boolean)this.processor.lastClaimed().isPresent());
    }

    @Test
    public void failedClaimCausesStop() throws Exception {
        Mockito.when((Object)this.tracker.tryClaim((Object)((OffsetByteProgress)ArgumentMatchers.any()))).thenReturn((Object)false);
        SettableApiFuture runDone = SettableApiFuture.create();
        SystemExecutors.getFuturesExecutor().execute(() -> {
            Assert.assertEquals((Object)DoFn.ProcessContinuation.stop(), (Object)this.processor.runFor(Duration.standardHours((long)10L)));
            runDone.set(null);
        });
        this.leakedConsumer.accept((List<SequencedMessage>)ImmutableList.of((Object)SubscriptionPartitionProcessorImplTest.messageWithOffset(1L)));
        runDone.get();
        ((RestrictionTracker)Mockito.verify(this.tracker, (VerificationMode)Mockito.times((int)1))).tryClaim((Object)((OffsetByteProgress)ArgumentMatchers.any()));
        Assert.assertFalse((boolean)this.processor.lastClaimed().isPresent());
        this.leakedConsumer.accept((List<SequencedMessage>)ImmutableList.of((Object)SubscriptionPartitionProcessorImplTest.messageWithOffset(2L)));
        ((RestrictionTracker)Mockito.verify(this.tracker, (VerificationMode)Mockito.times((int)1))).tryClaim((Object)((OffsetByteProgress)ArgumentMatchers.any()));
    }

    @Test
    public void successfulClaimThenTimeout() throws Exception {
        Mockito.when((Object)this.tracker.tryClaim((Object)((OffsetByteProgress)ArgumentMatchers.any()))).thenReturn((Object)true);
        SettableApiFuture runDone = SettableApiFuture.create();
        SystemExecutors.getFuturesExecutor().execute(() -> {
            Assert.assertEquals((Object)DoFn.ProcessContinuation.resume(), (Object)this.processor.runFor(Duration.standardSeconds((long)3L)));
            runDone.set(null);
        });
        SequencedMessage message1 = SubscriptionPartitionProcessorImplTest.messageWithOffset(1L);
        SequencedMessage message3 = SubscriptionPartitionProcessorImplTest.messageWithOffset(3L);
        this.leakedConsumer.accept((List<SequencedMessage>)ImmutableList.of((Object)message1, (Object)message3));
        runDone.get();
        InOrder order = Mockito.inOrder((Object[])new Object[]{this.tracker, this.receiver, this.subscriber});
        ((RestrictionTracker)order.verify(this.tracker)).tryClaim((Object)OffsetByteProgress.of((Offset)Offset.of((long)3L), (long)(message1.getSizeBytes() + message3.getSizeBytes())));
        ((DoFn.OutputReceiver)order.verify(this.receiver)).outputWithTimestamp((Object)message1, new Instant(Timestamps.toMillis((Timestamp)message1.getPublishTime())));
        ((DoFn.OutputReceiver)order.verify(this.receiver)).outputWithTimestamp((Object)message3, new Instant(Timestamps.toMillis((Timestamp)message3.getPublishTime())));
        ((FakeSubscriber)((Object)order.verify((Object)this.subscriber))).allowFlow(FlowControlRequest.newBuilder().setAllowedMessages(2L).setAllowedBytes(message1.getSizeBytes() + message3.getSizeBytes()).build());
        Assert.assertEquals(this.processor.lastClaimed().get(), (Object)Offset.of((long)3L));
    }

    static abstract class FakeSubscriber
    extends FakeApiService
    implements Subscriber {
        FakeSubscriber() {
        }
    }
}

