package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ChangeStreamAction;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ReadChangeStreamPartitionAction;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.SizeEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.class */
public class ReadChangeStreamPartitionDoFnTest {
    private ChangeStreamDao changeStreamDao;
    private MetadataTableDao metadataTableDao;
    private SizeEstimator<KV<ByteString, ChangeStreamMutation>> sizeEstimator;
    private ReadChangeStreamPartitionDoFn doFn;

    @Before
    public void setup() throws IOException {
        Duration standardSeconds = Duration.standardSeconds(1L);
        DaoFactory daoFactory = (DaoFactory) Mockito.mock(DaoFactory.class);
        this.changeStreamDao = (ChangeStreamDao) Mockito.mock(ChangeStreamDao.class);
        this.metadataTableDao = (MetadataTableDao) Mockito.mock(MetadataTableDao.class);
        Mockito.when(daoFactory.getChangeStreamDao()).thenReturn(this.changeStreamDao);
        Mockito.when(daoFactory.getMetadataTableDao()).thenReturn(this.metadataTableDao);
        Mockito.when(daoFactory.getChangeStreamName()).thenReturn("test-id");
        ActionFactory actionFactory = (ActionFactory) Mockito.mock(ActionFactory.class);
        ChangeStreamMetrics changeStreamMetrics = (ChangeStreamMetrics) Mockito.mock(ChangeStreamMetrics.class);
        this.sizeEstimator = (SizeEstimator) Mockito.mock(SizeEstimator.class);
        BytesThroughputEstimator bytesThroughputEstimator = new BytesThroughputEstimator(10, this.sizeEstimator, 1);
        ChangeStreamAction changeStreamAction = new ChangeStreamAction(changeStreamMetrics, bytesThroughputEstimator);
        ReadChangeStreamPartitionAction readChangeStreamPartitionAction = new ReadChangeStreamPartitionAction(this.metadataTableDao, this.changeStreamDao, changeStreamMetrics, changeStreamAction, standardSeconds);
        Mockito.when(actionFactory.changeStreamAction(changeStreamMetrics, bytesThroughputEstimator)).thenReturn(changeStreamAction);
        Mockito.when(actionFactory.readChangeStreamPartitionAction(this.metadataTableDao, this.changeStreamDao, changeStreamMetrics, changeStreamAction, standardSeconds)).thenReturn(readChangeStreamPartitionAction);
        this.doFn = new ReadChangeStreamPartitionDoFn(standardSeconds, daoFactory, actionFactory, changeStreamMetrics);
        this.doFn.setThroughputEstimator(bytesThroughputEstimator);
    }

    @Test
    public void testProcessElementAndGetSize() throws IOException, InterruptedException {
        Instant minus = Instant.now().minus(Duration.standardSeconds(10L));
        Range.ByteStringRange create = Range.ByteStringRange.create("", "");
        ChangeStreamContinuationToken create2 = ChangeStreamContinuationToken.create(create, "test");
        PartitionRecord partitionRecord = new PartitionRecord(create, minus, "uid-a", minus, Collections.emptyList(), Instant.now().plus(Duration.standardSeconds(60L)));
        Mockito.when(Long.valueOf(this.sizeEstimator.sizeOf((KV) ArgumentMatchers.any()))).thenReturn(100L);
        ReadChangeStreamPartitionProgressTracker readChangeStreamPartitionProgressTracker = (ReadChangeStreamPartitionProgressTracker) Mockito.mock(ReadChangeStreamPartitionProgressTracker.class);
        Mockito.when(readChangeStreamPartitionProgressTracker.currentRestriction()).thenReturn(new StreamProgress());
        DoFn.OutputReceiver outputReceiver = (DoFn.OutputReceiver) Mockito.mock(DoFn.OutputReceiver.class);
        ManualWatermarkEstimator manualWatermarkEstimator = (ManualWatermarkEstimator) Mockito.mock(ManualWatermarkEstimator.class);
        this.doFn.setup();
        ByteString copyFromUtf8 = ByteString.copyFromUtf8("a");
        ChangeStreamRecord changeStreamRecord = (ChangeStreamMutation) Mockito.mock(ChangeStreamMutation.class);
        Mockito.when(changeStreamRecord.getRowKey()).thenReturn(copyFromUtf8);
        Mockito.when(changeStreamRecord.getEstimatedLowWatermark()).thenReturn(TimestampConverter.toThreetenInstant(minus));
        Mockito.when(changeStreamRecord.getToken()).thenReturn(create2.getToken());
        Mockito.when(changeStreamRecord.getCommitTimestamp()).thenReturn(TimestampConverter.toThreetenInstant(minus));
        Mockito.when(Boolean.valueOf(this.metadataTableDao.lockAndRecordPartition((PartitionRecord) ArgumentMatchers.any()))).thenReturn(true);
        ServerStream serverStream = (ServerStream) Mockito.mock(ServerStream.class);
        Iterator it = (Iterator) Mockito.mock(Iterator.class);
        Mockito.when(Boolean.valueOf(it.hasNext())).thenReturn(true, new Boolean[]{true, true});
        Mockito.when((ChangeStreamRecord) it.next()).thenReturn(changeStreamRecord, new ChangeStreamRecord[]{changeStreamRecord, changeStreamRecord});
        Mockito.when(serverStream.iterator()).thenReturn(it);
        Mockito.when(this.changeStreamDao.readChangeStreamPartition((PartitionRecord) ArgumentMatchers.any(), (StreamProgress) ArgumentMatchers.any(), (Instant) ArgumentMatchers.any(), (Duration) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean())).thenReturn(serverStream);
        Mockito.when((Instant) manualWatermarkEstimator.getState()).thenReturn(minus);
        Mockito.when(Boolean.valueOf(readChangeStreamPartitionProgressTracker.tryClaim((StreamProgress) ArgumentMatchers.any()))).thenReturn(true, new Boolean[]{true, false});
        this.doFn.processElement(partitionRecord, readChangeStreamPartitionProgressTracker, outputReceiver, manualWatermarkEstimator);
        Assert.assertEquals(this.doFn.getSize(new StreamProgress(create2, minus)), ((100 * 2) / 10) * 10, 0.1d);
        ((DoFn.OutputReceiver) Mockito.verify(outputReceiver, Mockito.times(2))).outputWithTimestamp(KV.of(copyFromUtf8, changeStreamRecord), Instant.EPOCH);
    }
}
