package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Struct;
import java.util.Arrays;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsActionTest.class */
public class DetectNewPartitionsActionTest {
    private PartitionMetadataDao dao;
    private PartitionMetadataMapper mapper;
    private ChangeStreamMetrics metrics;
    private Duration resumeDuration;
    private RestrictionTracker<TimestampRange, Timestamp> tracker;
    private TimestampRange restriction;
    private DoFn.OutputReceiver<PartitionMetadata> receiver;
    private ManualWatermarkEstimator<Instant> watermarkEstimator;
    private DetectNewPartitionsAction action;

    @Before
    public void setUp() throws Exception {
        this.dao = (PartitionMetadataDao) Mockito.mock(PartitionMetadataDao.class);
        this.mapper = (PartitionMetadataMapper) Mockito.mock(PartitionMetadataMapper.class);
        this.metrics = (ChangeStreamMetrics) Mockito.mock(ChangeStreamMetrics.class);
        this.resumeDuration = Duration.standardSeconds(1L);
        this.tracker = (RestrictionTracker) Mockito.mock(RestrictionTracker.class);
        this.restriction = (TimestampRange) Mockito.mock(TimestampRange.class);
        this.receiver = (DoFn.OutputReceiver) Mockito.mock(DoFn.OutputReceiver.class);
        this.watermarkEstimator = (ManualWatermarkEstimator) Mockito.mock(ManualWatermarkEstimator.class);
        this.action = new DetectNewPartitionsAction(this.dao, this.mapper, this.metrics, this.resumeDuration);
        Mockito.when((TimestampRange) this.tracker.currentRestriction()).thenReturn(this.restriction);
    }

    @Test
    public void testSchedulesPartitionsAndResumesWhenPartitionsWereCreated() {
        Timestamp ofTimeMicroseconds = Timestamp.ofTimeMicroseconds(10L);
        Timestamp ofTimeMicroseconds2 = Timestamp.ofTimeMicroseconds(20L);
        Instant instant = new Instant(ofTimeMicroseconds2.toSqlTimestamp());
        ResultSet resultSet = (ResultSet) Mockito.mock(ResultSet.class);
        Timestamp ofTimeMicroseconds3 = Timestamp.ofTimeMicroseconds(15L);
        Timestamp ofTimeMicroseconds4 = Timestamp.ofTimeMicroseconds(30L);
        PartitionMetadata partitionMetadata = (PartitionMetadata) Mockito.mock(PartitionMetadata.class, Mockito.RETURNS_DEEP_STUBS);
        PartitionMetadata partitionMetadata2 = (PartitionMetadata) Mockito.mock(PartitionMetadata.class, Mockito.RETURNS_DEEP_STUBS);
        Mockito.when(partitionMetadata.getPartitionToken()).thenReturn("token1");
        Mockito.when(partitionMetadata.getCreatedAt()).thenReturn(ofTimeMicroseconds3);
        Mockito.when(partitionMetadata2.getPartitionToken()).thenReturn("token2");
        Mockito.when(partitionMetadata2.getCreatedAt()).thenReturn(ofTimeMicroseconds3);
        Mockito.when(this.restriction.getFrom()).thenReturn(ofTimeMicroseconds);
        Mockito.when(this.dao.getUnfinishedMinWatermark()).thenReturn(ofTimeMicroseconds2);
        Mockito.when(this.dao.getAllPartitionsCreatedAfter(ofTimeMicroseconds)).thenReturn(resultSet);
        Mockito.when(this.dao.updateToScheduled(Arrays.asList("token1", "token2"))).thenReturn(ofTimeMicroseconds4);
        Mockito.when(Boolean.valueOf(resultSet.next())).thenReturn(true, new Boolean[]{true, false});
        Mockito.when(this.mapper.from((Struct) ArgumentMatchers.any())).thenReturn(partitionMetadata, new PartitionMetadata[]{partitionMetadata2});
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim(ofTimeMicroseconds3))).thenReturn(true);
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(this.resumeDuration), this.action.run(this.tracker, this.receiver, this.watermarkEstimator));
        ((ManualWatermarkEstimator) Mockito.verify(this.watermarkEstimator)).setWatermark(instant);
        ((DoFn.OutputReceiver) Mockito.verify(this.receiver, Mockito.times(2))).outputWithTimestamp((PartitionMetadata) ArgumentMatchers.any(), (Instant) ArgumentMatchers.eq(instant));
    }

    @Test
    public void testDoesNothingWhenNoPartitionsWereCreated() {
        Timestamp ofTimeMicroseconds = Timestamp.ofTimeMicroseconds(10L);
        Timestamp ofTimeMicroseconds2 = Timestamp.ofTimeMicroseconds(20L);
        Instant instant = new Instant(ofTimeMicroseconds2.toSqlTimestamp());
        ResultSet resultSet = (ResultSet) Mockito.mock(ResultSet.class);
        Mockito.when(this.restriction.getFrom()).thenReturn(ofTimeMicroseconds);
        Mockito.when(this.dao.getUnfinishedMinWatermark()).thenReturn(ofTimeMicroseconds2);
        Mockito.when(this.dao.getAllPartitionsCreatedAfter(ofTimeMicroseconds)).thenReturn(resultSet);
        Mockito.when(Boolean.valueOf(resultSet.next())).thenReturn(false);
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(this.resumeDuration), this.action.run(this.tracker, this.receiver, this.watermarkEstimator));
        ((ManualWatermarkEstimator) Mockito.verify(this.watermarkEstimator)).setWatermark(instant);
        ((DoFn.OutputReceiver) Mockito.verify(this.receiver, Mockito.never())).outputWithTimestamp((PartitionMetadata) ArgumentMatchers.any(), (Instant) ArgumentMatchers.any());
    }

    @Test
    public void testTerminatesWhenAllPartitionsAreFinished() {
        Mockito.when(this.restriction.getFrom()).thenReturn(Timestamp.ofTimeMicroseconds(10L));
        Mockito.when(this.dao.getUnfinishedMinWatermark()).thenReturn((Object) null);
        Assert.assertEquals(DoFn.ProcessContinuation.stop(), this.action.run(this.tracker, this.receiver, this.watermarkEstimator));
        ((ManualWatermarkEstimator) Mockito.verify(this.watermarkEstimator, Mockito.never())).setWatermark((Instant) ArgumentMatchers.any());
        ((DoFn.OutputReceiver) Mockito.verify(this.receiver, Mockito.never())).outputWithTimestamp((PartitionMetadata) ArgumentMatchers.any(), (Instant) ArgumentMatchers.any());
    }
}
