package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.lang.invoke.SerializedLambda;
import java.util.Optional;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.DoubleMath;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdfTest.class */
public class PerSubscriptionPartitionSdfTest {
    private static final OffsetByteRange RESTRICTION = OffsetByteRange.of(new OffsetRange(1, Long.MAX_VALUE), 0);
    private static final SubscriptionPartition PARTITION = SubscriptionPartition.of((SubscriptionPath) UnitTestExamples.example(SubscriptionPath.class), (Partition) UnitTestExamples.example(Partition.class));

    @Mock
    SerializableFunction<SubscriptionPartition, InitialOffsetReader> offsetReaderFactory;

    @Mock
    ManagedFactory<TopicBacklogReader> backlogReaderFactory;

    @Mock
    TopicBacklogReader backlogReader;

    @Mock
    SerializableBiFunction<TopicBacklogReader, OffsetByteRange, TrackerWithProgress> trackerFactory;

    @Mock
    SubscriptionPartitionProcessorFactory processorFactory;

    @Mock
    ManagedFactory<BlockingCommitter> committerFactory;

    @Mock
    InitialOffsetReader initialOffsetReader;

    @Spy
    TrackerWithProgress tracker;

    @Mock
    DoFn.OutputReceiver<SequencedMessage> output;

    @Mock
    SubscriptionPartitionProcessor processor;

    @Mock
    BlockingCommitter committer;
    PerSubscriptionPartitionSdf sdf;

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdfTest$NoopManagedFactory.class */
    private static final class NoopManagedFactory<T extends AutoCloseable> implements ManagedFactory<T> {
        private NoopManagedFactory() {
        }

        public T create(SubscriptionPartition subscriptionPartition) {
            return null;
        }

        public void close() {
        }
    }

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
        Mockito.when((InitialOffsetReader) this.offsetReaderFactory.apply((SubscriptionPartition) ArgumentMatchers.any())).thenReturn(this.initialOffsetReader);
        Mockito.when(this.processorFactory.newProcessor((SubscriptionPartition) ArgumentMatchers.any(), (RestrictionTracker) ArgumentMatchers.any(), (DoFn.OutputReceiver) ArgumentMatchers.any())).thenReturn(this.processor);
        Mockito.when((TrackerWithProgress) this.trackerFactory.apply((TopicBacklogReader) ArgumentMatchers.any(), (OffsetByteRange) ArgumentMatchers.any())).thenReturn(this.tracker);
        Mockito.when(this.committerFactory.create((SubscriptionPartition) ArgumentMatchers.any())).thenReturn(this.committer);
        Mockito.when((OffsetByteRange) this.tracker.currentRestriction()).thenReturn(RESTRICTION);
        Mockito.when(this.backlogReaderFactory.create((SubscriptionPartition) ArgumentMatchers.any())).thenReturn(this.backlogReader);
        this.sdf = new PerSubscriptionPartitionSdf(this.backlogReaderFactory, this.committerFactory, this.offsetReaderFactory, this.trackerFactory, this.processorFactory);
    }

    @Test
    public void getInitialRestrictionReadSuccess() {
        Mockito.when(this.initialOffsetReader.read()).thenReturn((Offset) UnitTestExamples.example(Offset.class));
        OffsetByteRange initialRestriction = this.sdf.getInitialRestriction(PARTITION);
        Assert.assertEquals(((Offset) UnitTestExamples.example(Offset.class)).value(), initialRestriction.getRange().getFrom());
        Assert.assertEquals(Long.MAX_VALUE, initialRestriction.getRange().getTo());
        Assert.assertEquals(0L, initialRestriction.getByteCount());
        ((SerializableFunction) Mockito.verify(this.offsetReaderFactory)).apply(PARTITION);
    }

    @Test
    public void getInitialRestrictionReadFailure() {
        Mockito.when(this.initialOffsetReader.read()).thenThrow(new Throwable[]{new CheckedApiException(StatusCode.Code.INTERNAL).underlying});
        Assert.assertThrows(ApiException.class, () -> {
            this.sdf.getInitialRestriction(PARTITION);
        });
    }

    @Test
    public void newTrackerCallsFactory() {
        Assert.assertSame(this.tracker, this.sdf.newTracker(PARTITION, RESTRICTION));
        ((SerializableBiFunction) Mockito.verify(this.trackerFactory)).apply(this.backlogReader, RESTRICTION);
    }

    @Test
    public void tearDownClosesBacklogReaderFactory() throws Exception {
        this.sdf.teardown();
        ((ManagedFactory) Mockito.verify(this.backlogReaderFactory)).close();
    }

    @Test
    public void process() throws Exception {
        Mockito.when(this.processor.run()).thenReturn(DoFn.ProcessContinuation.resume());
        Mockito.when(this.processorFactory.newProcessor((SubscriptionPartition) ArgumentMatchers.any(), (RestrictionTracker) ArgumentMatchers.any(), (DoFn.OutputReceiver) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            RestrictionTracker restrictionTracker = (RestrictionTracker) invocationOnMock.getArgument(1);
            Mockito.when(Boolean.valueOf(this.tracker.tryClaim((OffsetByteProgress) ArgumentMatchers.any()))).thenReturn(true).thenReturn(false);
            Assert.assertTrue(restrictionTracker.tryClaim(OffsetByteProgress.of((Offset) UnitTestExamples.example(Offset.class), 123L)));
            Assert.assertFalse(restrictionTracker.tryClaim(OffsetByteProgress.of(Offset.of(333333L), 123L)));
            return this.processor;
        });
        ((SubscriptionPartitionProcessor) Mockito.doReturn(Optional.of((Offset) UnitTestExamples.example(Offset.class))).when(this.processor)).lastClaimed();
        Assert.assertEquals(DoFn.ProcessContinuation.resume(), this.sdf.processElement(this.tracker, PARTITION, this.output));
        ((SubscriptionPartitionProcessorFactory) Mockito.verify(this.processorFactory)).newProcessor((SubscriptionPartition) ArgumentMatchers.eq(PARTITION), (RestrictionTracker) ArgumentMatchers.any(), (DoFn.OutputReceiver) ArgumentMatchers.eq(this.output));
        InOrder inOrder = Mockito.inOrder(new Object[]{this.processor});
        ((SubscriptionPartitionProcessor) inOrder.verify(this.processor)).run();
        ((SubscriptionPartitionProcessor) inOrder.verify(this.processor)).lastClaimed();
        InOrder inOrder2 = Mockito.inOrder(new Object[]{this.committerFactory, this.committer});
        ((ManagedFactory) inOrder2.verify(this.committerFactory)).create(PARTITION);
        ((BlockingCommitter) inOrder2.verify(this.committer)).commitOffset(Offset.of(((Offset) UnitTestExamples.example(Offset.class)).value() + 1));
    }

    @Test
    public void dofnIsSerializable() throws Exception {
        new ObjectOutputStream(new ByteArrayOutputStream()).writeObject(new PerSubscriptionPartitionSdf(new NoopManagedFactory(), new NoopManagedFactory(), subscriptionPartition -> {
            return null;
        }, (topicBacklogReader, offsetByteRange) -> {
            return null;
        }, (subscriptionPartition2, restrictionTracker, outputReceiver) -> {
            return null;
        }));
    }

    @Test
    public void getProgressUnboundedRangeDelegates() {
        RestrictionTracker.Progress from = RestrictionTracker.Progress.from(0.0d, 0.2d);
        Mockito.when(this.tracker.getProgress()).thenReturn(from);
        Assert.assertTrue(DoubleMath.fuzzyEquals(from.getWorkRemaining(), this.sdf.getSize(PARTITION, RESTRICTION), 1.0E-4d));
        ((TrackerWithProgress) Mockito.verify(this.tracker)).getProgress();
    }

    @Test
    public void getProgressBoundedReturnsBytes() {
        Assert.assertTrue(DoubleMath.fuzzyEquals(123.0d, this.sdf.getSize(PARTITION, OffsetByteRange.of(new OffsetRange(87L, 8000L), 123L)), 1.0E-4d));
        Mockito.verifyNoInteractions(new Object[]{this.tracker});
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1739710941:
                if (implMethodName.equals("lambda$dofnIsSerializable$bd98a50f$1")) {
                    z = 2;
                    break;
                }
                break;
            case -841814665:
                if (implMethodName.equals("lambda$dofnIsSerializable$43268ee4$1")) {
                    z = true;
                    break;
                }
                break;
            case -53468240:
                if (implMethodName.equals("lambda$dofnIsSerializable$eba53d0$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableBiFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdfTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReader;Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/OffsetByteRange;)Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/TrackerWithProgress;")) {
                    return (topicBacklogReader, offsetByteRange) -> {
                        return null;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdfTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartition;)Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/InitialOffsetReader;")) {
                    return subscriptionPartition -> {
                        return null;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("newProcessor") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartition;Lorg/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker;Lorg/apache/beam/sdk/transforms/DoFn$OutputReceiver;)Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessor;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdfTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartition;Lorg/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker;Lorg/apache/beam/sdk/transforms/DoFn$OutputReceiver;)Lorg/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessor;")) {
                    return (subscriptionPartition2, restrictionTracker, outputReceiver) -> {
                        return null;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
