/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.BlockingCommitter;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.InitialOffsetReader;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.ManagedBacklogReaderFactory;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.OffsetByteProgress;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.OffsetByteRange;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.PerSubscriptionPartitionSdf;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscriptionPartition;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscriptionPartitionProcessor;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscriptionPartitionProcessorFactory;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.TopicBacklogReader;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.TrackerWithProgress;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;

@RunWith(value=JUnit4.class)
public class PerSubscriptionPartitionSdfTest {
    private static final OffsetByteRange RESTRICTION = OffsetByteRange.of((OffsetRange)new OffsetRange(1L, Long.MAX_VALUE), (long)0L);
    private static final SubscriptionPartition PARTITION = SubscriptionPartition.of((SubscriptionPath)((SubscriptionPath)UnitTestExamples.example(SubscriptionPath.class)), (Partition)((Partition)UnitTestExamples.example(Partition.class)));
    @Mock
    SerializableFunction<SubscriptionPartition, InitialOffsetReader> offsetReaderFactory;
    @Mock
    ManagedBacklogReaderFactory backlogReaderFactory;
    @Mock
    TopicBacklogReader backlogReader;
    @Mock
    SerializableBiFunction<TopicBacklogReader, OffsetByteRange, TrackerWithProgress> trackerFactory;
    @Mock
    SubscriptionPartitionProcessorFactory processorFactory;
    @Mock
    SerializableFunction<SubscriptionPartition, BlockingCommitter> committerFactory;
    @Mock
    InitialOffsetReader initialOffsetReader;
    @Spy
    TrackerWithProgress tracker;
    @Mock
    DoFn.OutputReceiver<SequencedMessage> output;
    @Mock
    SubscriptionPartitionProcessor processor;
    @Mock
    BlockingCommitter committer;
    PerSubscriptionPartitionSdf sdf;

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks((Object)this);
        Mockito.when((Object)((InitialOffsetReader)this.offsetReaderFactory.apply((Object)((SubscriptionPartition)ArgumentMatchers.any())))).thenReturn((Object)this.initialOffsetReader);
        Mockito.when((Object)this.processorFactory.newProcessor((SubscriptionPartition)ArgumentMatchers.any(), (RestrictionTracker)ArgumentMatchers.any(), (DoFn.OutputReceiver)ArgumentMatchers.any())).thenReturn((Object)this.processor);
        Mockito.when((Object)((TrackerWithProgress)this.trackerFactory.apply((Object)((TopicBacklogReader)ArgumentMatchers.any()), (Object)((OffsetByteRange)ArgumentMatchers.any())))).thenReturn((Object)this.tracker);
        Mockito.when((Object)((BlockingCommitter)this.committerFactory.apply((Object)((SubscriptionPartition)ArgumentMatchers.any())))).thenReturn((Object)this.committer);
        Mockito.when((Object)((OffsetByteRange)this.tracker.currentRestriction())).thenReturn((Object)RESTRICTION);
        Mockito.when((Object)this.backlogReaderFactory.newReader((SubscriptionPartition)ArgumentMatchers.any())).thenReturn((Object)this.backlogReader);
        this.sdf = new PerSubscriptionPartitionSdf(this.backlogReaderFactory, this.offsetReaderFactory, this.trackerFactory, this.processorFactory, this.committerFactory);
    }

    @Test
    public void getInitialRestrictionReadSuccess() {
        Mockito.when((Object)this.initialOffsetReader.read()).thenReturn((Object)((Offset)UnitTestExamples.example(Offset.class)));
        OffsetByteRange range = this.sdf.getInitialRestriction(PARTITION);
        Assert.assertEquals((long)((Offset)UnitTestExamples.example(Offset.class)).value(), (long)range.getRange().getFrom());
        Assert.assertEquals((long)Long.MAX_VALUE, (long)range.getRange().getTo());
        Assert.assertEquals((long)0L, (long)range.getByteCount());
        ((SerializableFunction)Mockito.verify(this.offsetReaderFactory)).apply((Object)PARTITION);
    }

    @Test
    public void getInitialRestrictionReadFailure() {
        Mockito.when((Object)this.initialOffsetReader.read()).thenThrow(new Throwable[]{new CheckedApiException((StatusCode.Code)StatusCode.Code.INTERNAL).underlying});
        Assert.assertThrows(ApiException.class, () -> this.sdf.getInitialRestriction(PARTITION));
    }

    @Test
    public void newTrackerCallsFactory() {
        Assert.assertSame((Object)this.tracker, (Object)this.sdf.newTracker(PARTITION, RESTRICTION));
        ((SerializableBiFunction)Mockito.verify(this.trackerFactory)).apply((Object)this.backlogReader, (Object)RESTRICTION);
    }

    @Test
    public void tearDownClosesBacklogReaderFactory() {
        this.sdf.teardown();
        ((ManagedBacklogReaderFactory)Mockito.verify((Object)this.backlogReaderFactory)).close();
    }

    @Test
    public void process() throws Exception {
        Mockito.when((Object)this.processor.run()).thenReturn((Object)DoFn.ProcessContinuation.resume());
        Mockito.when((Object)this.processorFactory.newProcessor((SubscriptionPartition)ArgumentMatchers.any(), (RestrictionTracker)ArgumentMatchers.any(), (DoFn.OutputReceiver)ArgumentMatchers.any())).thenAnswer(args -> {
            RestrictionTracker wrapped = (RestrictionTracker)args.getArgument(1);
            Mockito.when((Object)this.tracker.tryClaim((Object)((OffsetByteProgress)ArgumentMatchers.any()))).thenReturn((Object)true).thenReturn((Object)false);
            Assert.assertTrue((boolean)wrapped.tryClaim((Object)OffsetByteProgress.of((Offset)((Offset)UnitTestExamples.example(Offset.class)), (long)123L)));
            Assert.assertFalse((boolean)wrapped.tryClaim((Object)OffsetByteProgress.of((Offset)Offset.of((long)333333L), (long)123L)));
            return this.processor;
        });
        ((SubscriptionPartitionProcessor)Mockito.doReturn(Optional.of((Offset)UnitTestExamples.example(Offset.class))).when((Object)this.processor)).lastClaimed();
        Assert.assertEquals((Object)DoFn.ProcessContinuation.resume(), (Object)this.sdf.processElement((RestrictionTracker)this.tracker, PARTITION, this.output));
        ((SubscriptionPartitionProcessorFactory)Mockito.verify((Object)this.processorFactory)).newProcessor((SubscriptionPartition)ArgumentMatchers.eq((Object)PARTITION), (RestrictionTracker)ArgumentMatchers.any(), (DoFn.OutputReceiver)ArgumentMatchers.eq(this.output));
        InOrder order = Mockito.inOrder((Object[])new Object[]{this.processor});
        ((SubscriptionPartitionProcessor)order.verify((Object)this.processor)).run();
        ((SubscriptionPartitionProcessor)order.verify((Object)this.processor)).lastClaimed();
        InOrder order2 = Mockito.inOrder((Object[])new Object[]{this.committerFactory, this.committer});
        ((SerializableFunction)order2.verify(this.committerFactory)).apply((Object)PARTITION);
        ((BlockingCommitter)order2.verify((Object)this.committer)).commitOffset(Offset.of((long)(((Offset)UnitTestExamples.example(Offset.class)).value() + 1L)));
    }

    @Test
    public void dofnIsSerializable() throws Exception {
        ObjectOutputStream output = new ObjectOutputStream(new ByteArrayOutputStream());
        output.writeObject(new PerSubscriptionPartitionSdf((ManagedBacklogReaderFactory)new NoopManagedBacklogReaderFactory(), (SerializableFunction & Serializable)x -> null, (SerializableBiFunction & Serializable)(x, y) -> null, (SubscriptionPartitionProcessorFactory & Serializable)(x, y, z) -> null, (SerializableFunction & Serializable)x -> null));
    }

    @Test
    public void getProgressUnboundedRangeDelegates() {
        RestrictionTracker.Progress progress = RestrictionTracker.Progress.from((double)0.0, (double)0.2);
        Mockito.when((Object)this.tracker.getProgress()).thenReturn((Object)progress);
        Assert.assertTrue((boolean)DoubleMath.fuzzyEquals((double)progress.getWorkRemaining(), (double)this.sdf.getSize(PARTITION, RESTRICTION), (double)1.0E-4));
        ((TrackerWithProgress)Mockito.verify((Object)this.tracker)).getProgress();
    }

    @Test
    public void getProgressBoundedReturnsBytes() {
        Assert.assertTrue((boolean)DoubleMath.fuzzyEquals((double)123.0, (double)this.sdf.getSize(PARTITION, OffsetByteRange.of((OffsetRange)new OffsetRange(87L, 8000L), (long)123L)), (double)1.0E-4));
        Mockito.verifyNoInteractions((Object[])new Object[]{this.tracker});
    }

    private static final class NoopManagedBacklogReaderFactory
    implements ManagedBacklogReaderFactory {
        private NoopManagedBacklogReaderFactory() {
        }

        public TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition) {
            return null;
        }

        public void close() {
        }
    }
}

