package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.Iterator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ChangeStreamAction;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ReadChangeStreamPartitionAction;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.CoderSizeEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.class */
public class ReadChangeStreamPartitionDoFnTest {
    private ChangeStreamDao changeStreamDao;
    private MetadataTableDao metadataTableDao;
    private CoderSizeEstimator<KV<ByteString, ChangeStreamRecord>> sizeEstimator;
    private DaoFactory daoFactory;
    private ActionFactory actionFactory;
    private ChangeStreamMetrics metrics;
    private ReadChangeStreamPartitionDoFn doFn;

    @Before
    public void setup() throws IOException {
        Duration standardSeconds = Duration.standardSeconds(1L);
        this.daoFactory = (DaoFactory) Mockito.mock(DaoFactory.class);
        this.changeStreamDao = (ChangeStreamDao) Mockito.mock(ChangeStreamDao.class);
        this.metadataTableDao = (MetadataTableDao) Mockito.mock(MetadataTableDao.class);
        Mockito.when(this.daoFactory.getChangeStreamDao()).thenReturn(this.changeStreamDao);
        Mockito.when(this.daoFactory.getMetadataTableDao()).thenReturn(this.metadataTableDao);
        Mockito.when(this.daoFactory.getChangeStreamName()).thenReturn("test-id");
        this.actionFactory = (ActionFactory) Mockito.mock(ActionFactory.class);
        this.metrics = (ChangeStreamMetrics) Mockito.mock(ChangeStreamMetrics.class);
        this.sizeEstimator = (CoderSizeEstimator) Mockito.mock(CoderSizeEstimator.class);
        ChangeStreamAction changeStreamAction = new ChangeStreamAction(this.metrics);
        ReadChangeStreamPartitionAction readChangeStreamPartitionAction = new ReadChangeStreamPartitionAction(this.metadataTableDao, this.changeStreamDao, this.metrics, changeStreamAction, standardSeconds, this.sizeEstimator);
        Mockito.when(this.actionFactory.changeStreamAction(this.metrics)).thenReturn(changeStreamAction);
        Mockito.when(this.actionFactory.readChangeStreamPartitionAction(this.metadataTableDao, this.changeStreamDao, this.metrics, changeStreamAction, standardSeconds, this.sizeEstimator)).thenReturn(readChangeStreamPartitionAction);
        this.doFn = new ReadChangeStreamPartitionDoFn(this.daoFactory, this.actionFactory, this.metrics, Duration.ZERO);
        this.doFn.setSizeEstimator(this.sizeEstimator);
    }

    @Test
    public void testProcessElementAndGetSize() throws IOException, InterruptedException {
        Instant minus = Instant.now().minus(Duration.standardSeconds(10L));
        Range.ByteStringRange create = Range.ByteStringRange.create("", "");
        ChangeStreamContinuationToken create2 = ChangeStreamContinuationToken.create(create, "test");
        PartitionRecord partitionRecord = new PartitionRecord(create, minus, "uid-a", minus, Collections.emptyList(), Instant.now().plus(Duration.standardSeconds(60L)));
        Mockito.when(Long.valueOf(this.sizeEstimator.sizeOf((KV) ArgumentMatchers.any()))).thenReturn(100L);
        ReadChangeStreamPartitionProgressTracker readChangeStreamPartitionProgressTracker = (ReadChangeStreamPartitionProgressTracker) Mockito.mock(ReadChangeStreamPartitionProgressTracker.class);
        Mockito.when(readChangeStreamPartitionProgressTracker.currentRestriction()).thenReturn(new StreamProgress());
        DoFn.OutputReceiver outputReceiver = (DoFn.OutputReceiver) Mockito.mock(DoFn.OutputReceiver.class);
        ManualWatermarkEstimator manualWatermarkEstimator = (ManualWatermarkEstimator) Mockito.mock(ManualWatermarkEstimator.class);
        this.doFn.setup();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("a");
        ChangeStreamRecord changeStreamRecord = (ChangeStreamMutation) Mockito.mock(ChangeStreamMutation.class);
        Mockito.when(changeStreamRecord.getRowKey()).thenReturn(copyFromUtf8);
        Mockito.when(changeStreamRecord.getEstimatedLowWatermark()).thenReturn(TimestampConverter.toThreetenInstant(minus));
        Mockito.when(changeStreamRecord.getToken()).thenReturn(create2.getToken());
        Mockito.when(changeStreamRecord.getCommitTimestamp()).thenReturn(TimestampConverter.toThreetenInstant(minus));
        Mockito.when(Boolean.valueOf(this.metadataTableDao.lockAndRecordPartition((PartitionRecord) ArgumentMatchers.any()))).thenReturn(true);
        ServerStream serverStream = (ServerStream) Mockito.mock(ServerStream.class);
        Iterator it = (Iterator) Mockito.mock(Iterator.class);
        Mockito.when(Boolean.valueOf(it.hasNext())).thenReturn(true, new Boolean[]{true, true});
        Mockito.when((ChangeStreamRecord) it.next()).thenReturn(changeStreamRecord, new ChangeStreamRecord[]{changeStreamRecord, changeStreamRecord});
        Mockito.when(serverStream.iterator()).thenReturn(it);
        Mockito.when(this.changeStreamDao.readChangeStreamPartition((PartitionRecord) ArgumentMatchers.any(), (StreamProgress) ArgumentMatchers.any(), (Instant) ArgumentMatchers.any(), (Duration) ArgumentMatchers.any())).thenReturn(serverStream);
        Mockito.when((Instant) manualWatermarkEstimator.getState()).thenReturn(minus);
        Mockito.when(Boolean.valueOf(readChangeStreamPartitionProgressTracker.tryClaim((StreamProgress) ArgumentMatchers.any()))).thenReturn(true, new Boolean[]{true, false});
        this.doFn.processElement(partitionRecord, readChangeStreamPartitionProgressTracker, outputReceiver, manualWatermarkEstimator);
        Assert.assertEquals(this.doFn.getSize(new StreamProgress(create2, minus, BigDecimal.valueOf(20L), Instant.now(), false)), ((100 * 2) / 10) * 10, 10.0d);
        ((DoFn.OutputReceiver) Mockito.verify(outputReceiver, Mockito.times(2))).outputWithTimestamp(KV.of(copyFromUtf8, changeStreamRecord), Instant.EPOCH);
    }

    @Test
    public void testGetSizeCantBeNegative() throws IOException {
        Mockito.when(Long.valueOf(this.sizeEstimator.sizeOf((KV) ArgumentMatchers.any()))).thenReturn(100L);
        ChangeStreamContinuationToken create = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("", ""), "test");
        this.doFn.setup();
        Assert.assertEquals(0.0d, this.doFn.getSize(new StreamProgress(create, Instant.now().plus(Duration.standardMinutes(10L)), BigDecimal.valueOf(1000L), Instant.now().plus(Duration.standardMinutes(10L)), false)), 0.0d);
        Assert.assertEquals(0.0d, this.doFn.getSize(new StreamProgress(create, Instant.now().plus(Duration.standardMinutes(10L)), BigDecimal.valueOf(1000L), Instant.now().plus(Duration.standardMinutes(10L)), true)), 0.0d);
    }

    @Test
    public void backlogReplicationAdjustment() throws IOException {
        SerializableSupplier serializableSupplier = () -> {
            return Instant.ofEpochSecond(1000L);
        };
        this.doFn = new ReadChangeStreamPartitionDoFn(this.daoFactory, this.actionFactory, this.metrics, Duration.standardSeconds(30L), serializableSupplier);
        Mockito.when(Long.valueOf(this.sizeEstimator.sizeOf((KV) ArgumentMatchers.any()))).thenReturn(100L);
        this.doFn.setSizeEstimator(this.sizeEstimator);
        ChangeStreamContinuationToken create = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("", ""), "test");
        this.doFn.setup();
        Assert.assertEquals(0.0d, this.doFn.getSize(new StreamProgress(create, ((Instant) serializableSupplier.get()).minus(Duration.standardSeconds(10L)), BigDecimal.valueOf(1000L), ((Instant) serializableSupplier.get()).minus(Duration.standardSeconds(10L)), false)), 0.0d);
        Assert.assertEquals(30000.0d, this.doFn.getSize(new StreamProgress(create, ((Instant) serializableSupplier.get()).minus(Duration.standardSeconds(60L)), BigDecimal.valueOf(1000L), ((Instant) serializableSupplier.get()).minus(Duration.standardSeconds(60L)), false)), 0.0d);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1880805068:
                if (implMethodName.equals("lambda$backlogReplicationAdjustment$8d965cfc$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest") && serializedLambda.getImplMethodSignature().equals("()Lorg/joda/time/Instant;")) {
                    return () -> {
                        return Instant.ofEpochSecond(1000L);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
