package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;

import com.google.cloud.bigtable.common.Status;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.CloseStream;
import com.google.cloud.bigtable.data.v2.models.Heartbeat;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.protobuf.ByteString;
import java.util.Collections;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.ThroughputEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIOReadTest;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.class */
public class ChangeStreamActionTest {
    private ChangeStreamMetrics metrics;
    private ChangeStreamAction action;
    private RestrictionTracker<StreamProgress, StreamProgress> tracker;
    private PartitionRecord partitionRecord;
    private DoFn.OutputReceiver<KV<ByteString, ChangeStreamMutation>> receiver;
    private ManualWatermarkEstimator<Instant> watermarkEstimator;
    private ThroughputEstimator<KV<ByteString, ChangeStreamMutation>> throughputEstimator;

    @Before
    public void setUp() {
        this.metrics = (ChangeStreamMetrics) Mockito.mock(ChangeStreamMetrics.class);
        this.tracker = (RestrictionTracker) Mockito.mock(ReadChangeStreamPartitionProgressTracker.class);
        this.partitionRecord = (PartitionRecord) Mockito.mock(PartitionRecord.class);
        this.receiver = (DoFn.OutputReceiver) Mockito.mock(DoFn.OutputReceiver.class);
        this.watermarkEstimator = (ManualWatermarkEstimator) Mockito.mock(ManualWatermarkEstimator.class);
        this.throughputEstimator = (ThroughputEstimator) Mockito.mock(ThroughputEstimator.class);
        this.action = new ChangeStreamAction(this.metrics, this.throughputEstimator);
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim((StreamProgress) ArgumentMatchers.any()))).thenReturn(true);
    }

    @Test
    public void testHeartBeat() {
        Instant ofEpochSecond = Instant.ofEpochSecond(1000L);
        ChangeStreamContinuationToken create = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("a", "b"), SpannerIOReadTest.PROJECT_ID);
        Heartbeat heartbeat = (Heartbeat) Mockito.mock(Heartbeat.class);
        Mockito.when(heartbeat.getEstimatedLowWatermark()).thenReturn(TimestampConverter.toThreetenInstant(ofEpochSecond));
        Mockito.when(heartbeat.getChangeStreamContinuationToken()).thenReturn(create);
        Assert.assertFalse(this.action.run(this.partitionRecord, heartbeat, this.tracker, this.receiver, this.watermarkEstimator, false).isPresent());
        ((ChangeStreamMetrics) Mockito.verify(this.metrics)).incHeartbeatCount();
        ((ManualWatermarkEstimator) Mockito.verify(this.watermarkEstimator)).setWatermark((Instant) ArgumentMatchers.eq(ofEpochSecond));
        ((RestrictionTracker) Mockito.verify(this.tracker)).tryClaim((StreamProgress) ArgumentMatchers.eq(new StreamProgress(create, ofEpochSecond)));
        ((ThroughputEstimator) Mockito.verify(this.throughputEstimator, Mockito.never())).update((Instant) ArgumentMatchers.any(), (KV) ArgumentMatchers.any());
    }

    @Test
    public void testCloseStreamResume() {
        ChangeStreamContinuationToken create = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("a", "b"), SpannerIOReadTest.PROJECT_ID);
        CloseStream closeStream = (CloseStream) Mockito.mock(CloseStream.class);
        Mockito.when(closeStream.getStatus()).thenReturn(Status.fromProto(com.google.rpc.Status.newBuilder().setCode(11).build()));
        Mockito.when(closeStream.getChangeStreamContinuationTokens()).thenReturn(Collections.singletonList(create));
        Optional run = this.action.run(this.partitionRecord, closeStream, this.tracker, this.receiver, this.watermarkEstimator, false);
        Assert.assertTrue(run.isPresent());
        Assert.assertEquals(DoFn.ProcessContinuation.resume(), run.get());
        ((ChangeStreamMetrics) Mockito.verify(this.metrics)).incClosestreamCount();
        ((RestrictionTracker) Mockito.verify(this.tracker)).tryClaim((StreamProgress) ArgumentMatchers.eq(new StreamProgress(closeStream)));
    }

    @Test
    public void testChangeStreamMutationUser() {
        Mockito.when(this.partitionRecord.getPartition()).thenReturn(Range.ByteStringRange.create("", ""));
        Instant ofEpochSecond = Instant.ofEpochSecond(1000L);
        Instant ofEpochSecond2 = Instant.ofEpochSecond(500L);
        ChangeStreamContinuationToken create = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("", ""), SpannerIOReadTest.PROJECT_ID);
        ChangeStreamMutation changeStreamMutation = (ChangeStreamMutation) Mockito.mock(ChangeStreamMutation.class);
        Mockito.when(changeStreamMutation.getCommitTimestamp()).thenReturn(TimestampConverter.toThreetenInstant(ofEpochSecond));
        Mockito.when(changeStreamMutation.getToken()).thenReturn(SpannerIOReadTest.PROJECT_ID);
        Mockito.when(changeStreamMutation.getEstimatedLowWatermark()).thenReturn(TimestampConverter.toThreetenInstant(ofEpochSecond2));
        Mockito.when(changeStreamMutation.getType()).thenReturn(ChangeStreamMutation.MutationType.USER);
        KV of = KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
        Assert.assertFalse(this.action.run(this.partitionRecord, changeStreamMutation, this.tracker, this.receiver, this.watermarkEstimator, false).isPresent());
        ((ChangeStreamMetrics) Mockito.verify(this.metrics)).incChangeStreamMutationUserCounter();
        ((ChangeStreamMetrics) Mockito.verify(this.metrics, Mockito.never())).incChangeStreamMutationGcCounter();
        ((RestrictionTracker) Mockito.verify(this.tracker)).tryClaim((StreamProgress) ArgumentMatchers.eq(new StreamProgress(create, ofEpochSecond2)));
        ((DoFn.OutputReceiver) Mockito.verify(this.receiver)).outputWithTimestamp((KV) ArgumentMatchers.eq(of), (Instant) ArgumentMatchers.eq(Instant.EPOCH));
        ((ManualWatermarkEstimator) Mockito.verify(this.watermarkEstimator)).setWatermark((Instant) ArgumentMatchers.eq(ofEpochSecond2));
        ((ThroughputEstimator) Mockito.verify(this.throughputEstimator)).update((Instant) ArgumentMatchers.any(), (KV) ArgumentMatchers.eq(of));
    }

    @Test
    public void testChangeStreamMutationGc() {
        Mockito.when(this.partitionRecord.getPartition()).thenReturn(Range.ByteStringRange.create("", ""));
        Instant ofEpochSecond = Instant.ofEpochSecond(1000L);
        Instant ofEpochSecond2 = Instant.ofEpochSecond(500L);
        ChangeStreamContinuationToken create = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("", ""), SpannerIOReadTest.PROJECT_ID);
        ChangeStreamMutation changeStreamMutation = (ChangeStreamMutation) Mockito.mock(ChangeStreamMutation.class);
        Mockito.when(changeStreamMutation.getCommitTimestamp()).thenReturn(TimestampConverter.toThreetenInstant(ofEpochSecond));
        Mockito.when(changeStreamMutation.getToken()).thenReturn(SpannerIOReadTest.PROJECT_ID);
        Mockito.when(changeStreamMutation.getEstimatedLowWatermark()).thenReturn(TimestampConverter.toThreetenInstant(ofEpochSecond2));
        Mockito.when(changeStreamMutation.getType()).thenReturn(ChangeStreamMutation.MutationType.GARBAGE_COLLECTION);
        KV of = KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
        Assert.assertFalse(this.action.run(this.partitionRecord, changeStreamMutation, this.tracker, this.receiver, this.watermarkEstimator, false).isPresent());
        ((ChangeStreamMetrics) Mockito.verify(this.metrics)).incChangeStreamMutationGcCounter();
        ((ChangeStreamMetrics) Mockito.verify(this.metrics, Mockito.never())).incChangeStreamMutationUserCounter();
        ((RestrictionTracker) Mockito.verify(this.tracker)).tryClaim((StreamProgress) ArgumentMatchers.eq(new StreamProgress(create, ofEpochSecond2)));
        ((DoFn.OutputReceiver) Mockito.verify(this.receiver)).outputWithTimestamp((KV) ArgumentMatchers.eq(of), (Instant) ArgumentMatchers.eq(Instant.EPOCH));
        ((ManualWatermarkEstimator) Mockito.verify(this.watermarkEstimator)).setWatermark((Instant) ArgumentMatchers.eq(ofEpochSecond2));
        ((ThroughputEstimator) Mockito.verify(this.throughputEstimator)).update((Instant) ArgumentMatchers.any(), (KV) ArgumentMatchers.eq(of));
    }
}
