/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.io.gcp.pubsublite.InitialOffsetReader;
import org.apache.beam.sdk.io.gcp.pubsublite.OffsetByteProgress;
import org.apache.beam.sdk.io.gcp.pubsublite.PerSubscriptionPartitionSdf;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriptionPartition;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriptionPartitionProcessor;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriptionPartitionProcessorFactory;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;

@RunWith(value=JUnit4.class)
public class PerSubscriptionPartitionSdfTest {
    private static final @UnknownKeyFor @NonNull @Initialized Duration MAX_SLEEP_TIME = Duration.standardMinutes((long)10L).plus((ReadableDuration)Duration.millis((long)10L));
    private static final @UnknownKeyFor @NonNull @Initialized OffsetRange RESTRICTION = new OffsetRange(1L, Long.MAX_VALUE);
    private static final @UnknownKeyFor @NonNull @Initialized SubscriptionPartition PARTITION = SubscriptionPartition.of((SubscriptionPath)((SubscriptionPath)UnitTestExamples.example(SubscriptionPath.class)), (Partition)((Partition)UnitTestExamples.example(Partition.class)));
    @Mock
    @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized SubscriptionPartition, @UnknownKeyFor @NonNull @Initialized InitialOffsetReader> offsetReaderFactory;
    @Mock
    @UnknownKeyFor @NonNull @Initialized SerializableBiFunction<@UnknownKeyFor @NonNull @Initialized SubscriptionPartition, @UnknownKeyFor @NonNull @Initialized OffsetRange, @UnknownKeyFor @NonNull @Initialized RestrictionTracker<@UnknownKeyFor @NonNull @Initialized OffsetRange, @UnknownKeyFor @NonNull @Initialized OffsetByteProgress>> trackerFactory;
    @Mock
    @UnknownKeyFor @NonNull @Initialized SubscriptionPartitionProcessorFactory processorFactory;
    @Mock
    @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized SubscriptionPartition, @UnknownKeyFor @NonNull @Initialized Committer> committerFactory;
    @Mock
    @UnknownKeyFor @NonNull @Initialized InitialOffsetReader initialOffsetReader;
    @Spy
    @UnknownKeyFor @NonNull @Initialized RestrictionTracker<@UnknownKeyFor @NonNull @Initialized OffsetRange, @UnknownKeyFor @NonNull @Initialized OffsetByteProgress> tracker;
    @Mock
    // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized DoFn.OutputReceiver<@UnknownKeyFor @NonNull @Initialized SequencedMessage> output;
    @Mock
    // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized DoFn.BundleFinalizer finalizer;
    @Mock
    @UnknownKeyFor @NonNull @Initialized SubscriptionPartitionProcessor processor;
    @Spy
    @UnknownKeyFor @NonNull @Initialized FakeCommitter committer;
    @UnknownKeyFor @NonNull @Initialized PerSubscriptionPartitionSdf sdf;

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks((Object)this);
        Mockito.when((Object)((InitialOffsetReader)this.offsetReaderFactory.apply((Object)((SubscriptionPartition)ArgumentMatchers.any())))).thenReturn((Object)this.initialOffsetReader);
        Mockito.when((Object)this.processorFactory.newProcessor((SubscriptionPartition)ArgumentMatchers.any(), (RestrictionTracker)ArgumentMatchers.any(), (DoFn.OutputReceiver)ArgumentMatchers.any())).thenReturn((Object)this.processor);
        Mockito.when((Object)((RestrictionTracker)this.trackerFactory.apply((Object)((SubscriptionPartition)ArgumentMatchers.any()), (Object)((OffsetRange)ArgumentMatchers.any())))).thenReturn(this.tracker);
        Mockito.when((Object)((Committer)this.committerFactory.apply((Object)((SubscriptionPartition)ArgumentMatchers.any())))).thenReturn((Object)this.committer);
        Mockito.when((Object)((OffsetRange)this.tracker.currentRestriction())).thenReturn((Object)RESTRICTION);
        this.sdf = new PerSubscriptionPartitionSdf(MAX_SLEEP_TIME, this.offsetReaderFactory, this.trackerFactory, this.processorFactory, this.committerFactory);
    }

    @Test
    public void getInitialRestrictionReadSuccess() {
        Mockito.when((Object)this.initialOffsetReader.read()).thenReturn((Object)((Offset)UnitTestExamples.example(Offset.class)));
        OffsetRange range = this.sdf.getInitialRestriction(PARTITION);
        Assert.assertEquals((long)((Offset)UnitTestExamples.example(Offset.class)).value(), (long)range.getFrom());
        Assert.assertEquals((long)Long.MAX_VALUE, (long)range.getTo());
        ((SerializableFunction)Mockito.verify(this.offsetReaderFactory)).apply((Object)PARTITION);
    }

    @Test
    public void getInitialRestrictionReadFailure() {
        Mockito.when((Object)this.initialOffsetReader.read()).thenThrow(new Throwable[]{new CheckedApiException((StatusCode.Code)StatusCode.Code.INTERNAL).underlying});
        Assert.assertThrows(ApiException.class, () -> this.sdf.getInitialRestriction(PARTITION));
    }

    @Test
    public void newTrackerCallsFactory() {
        Assert.assertSame(this.tracker, (Object)this.sdf.newTracker(PARTITION, RESTRICTION));
        ((SerializableBiFunction)Mockito.verify(this.trackerFactory)).apply((Object)PARTITION, (Object)RESTRICTION);
    }

    @Test
    public void process() throws @UnknownKeyFor @NonNull @Initialized Exception {
        Mockito.when((Object)this.processor.waitForCompletion(MAX_SLEEP_TIME)).thenReturn((Object)DoFn.ProcessContinuation.resume());
        Mockito.when((Object)this.processorFactory.newProcessor((SubscriptionPartition)ArgumentMatchers.any(), (RestrictionTracker)ArgumentMatchers.any(), (DoFn.OutputReceiver)ArgumentMatchers.any())).thenAnswer(args -> {
            RestrictionTracker wrapped = (RestrictionTracker)args.getArgument(1);
            Mockito.when((Object)this.tracker.tryClaim((Object)((OffsetByteProgress)ArgumentMatchers.any()))).thenReturn((Object)true).thenReturn((Object)false);
            Assert.assertTrue((boolean)wrapped.tryClaim((Object)OffsetByteProgress.of((Offset)((Offset)UnitTestExamples.example(Offset.class)), (long)123L)));
            Assert.assertFalse((boolean)wrapped.tryClaim((Object)OffsetByteProgress.of((Offset)Offset.of((long)333333L), (long)123L)));
            return this.processor;
        });
        AtomicReference<Object> callbackRef = new AtomicReference<Object>(null);
        ((DoFn.BundleFinalizer)Mockito.doAnswer(args -> {
            callbackRef.set(((DoFn.BundleFinalizer.Callback)args.getArgument(1)));
            return null;
        }).when((Object)this.finalizer)).afterBundleCommit((Instant)ArgumentMatchers.any(), (DoFn.BundleFinalizer.Callback)ArgumentMatchers.any());
        ((SubscriptionPartitionProcessor)Mockito.doReturn(Optional.of((Offset)UnitTestExamples.example(Offset.class))).when((Object)this.processor)).lastClaimed();
        Assert.assertEquals((Object)DoFn.ProcessContinuation.resume(), (Object)this.sdf.processElement(this.tracker, PARTITION, this.output, this.finalizer));
        ((SubscriptionPartitionProcessorFactory)Mockito.verify((Object)this.processorFactory)).newProcessor((SubscriptionPartition)ArgumentMatchers.eq((Object)PARTITION), (RestrictionTracker)ArgumentMatchers.any(), (DoFn.OutputReceiver)ArgumentMatchers.eq(this.output));
        InOrder order = Mockito.inOrder((Object[])new Object[]{this.processor});
        ((SubscriptionPartitionProcessor)order.verify((Object)this.processor)).start();
        ((SubscriptionPartitionProcessor)order.verify((Object)this.processor)).waitForCompletion(MAX_SLEEP_TIME);
        ((SubscriptionPartitionProcessor)order.verify((Object)this.processor)).lastClaimed();
        ((SubscriptionPartitionProcessor)order.verify((Object)this.processor)).close();
        ((DoFn.BundleFinalizer)Mockito.verify((Object)this.finalizer)).afterBundleCommit((Instant)ArgumentMatchers.eq((Object)Instant.ofEpochMilli((long)Long.MAX_VALUE)), (DoFn.BundleFinalizer.Callback)ArgumentMatchers.any());
        Assert.assertTrue((callbackRef.get() != null ? 1 : 0) != 0);
        Mockito.when((Object)this.committer.commitOffset((Offset)ArgumentMatchers.any())).thenReturn((Object)ApiFutures.immediateFuture(null));
        ((DoFn.BundleFinalizer.Callback)callbackRef.get()).onBundleSuccess();
        InOrder order2 = Mockito.inOrder((Object[])new Object[]{this.committerFactory, this.committer});
        ((FakeCommitter)((Object)order2.verify((Object)this.committer))).startAsync();
        ((FakeCommitter)((Object)order2.verify((Object)this.committer))).awaitRunning();
        ((FakeCommitter)((Object)order2.verify((Object)this.committer))).commitOffset(Offset.of((long)(((Offset)UnitTestExamples.example(Offset.class)).value() + 1L)));
        ((FakeCommitter)((Object)order2.verify((Object)this.committer))).stopAsync();
        ((FakeCommitter)((Object)order2.verify((Object)this.committer))).awaitTerminated();
    }

    @Test
    public void dofnIsSerializable() throws @UnknownKeyFor @NonNull @Initialized Exception {
        ObjectOutputStream output = new ObjectOutputStream(new ByteArrayOutputStream());
        output.writeObject(new PerSubscriptionPartitionSdf(MAX_SLEEP_TIME, (SerializableFunction & Serializable)x -> null, (SerializableBiFunction & Serializable)(x, y) -> null, (SubscriptionPartitionProcessorFactory & Serializable)(x, y, z) -> null, (SerializableFunction & Serializable)x -> null));
    }

    static abstract class FakeCommitter
    extends FakeApiService
    implements Committer {
        FakeCommitter() {
        }
    }
}

