/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn;

import com.google.cloud.Timestamp;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ChildPartitionsRecordAction;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DataChangeRecordAction;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.HeartbeatRecordAction;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.QueryChangeStreamAction;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

@RunWith(value=JUnit4.class)
public class ReadChangeStreamPartitionDoFnTest {
    private static final String PARTITION_TOKEN = "partitionToken";
    private static final Timestamp PARTITION_START_TIMESTAMP = Timestamp.ofTimeSecondsAndNanos((long)10L, (int)20);
    private static final Timestamp PARTITION_END_TIMESTAMP = Timestamp.ofTimeSecondsAndNanos((long)30L, (int)40);
    private static final long PARTITION_HEARTBEAT_MILLIS = 30000L;
    private ReadChangeStreamPartitionDoFn doFn;
    private PartitionMetadata partition;
    private TimestampRange restriction;
    private RestrictionTracker<TimestampRange, Timestamp> tracker;
    private DoFn.OutputReceiver<DataChangeRecord> receiver;
    private ManualWatermarkEstimator<Instant> watermarkEstimator;
    private DoFn.BundleFinalizer bundleFinalizer;
    private DataChangeRecordAction dataChangeRecordAction;
    private HeartbeatRecordAction heartbeatRecordAction;
    private ChildPartitionsRecordAction childPartitionsRecordAction;
    private QueryChangeStreamAction queryChangeStreamAction;

    @Before
    public void setUp() {
        DaoFactory daoFactory = (DaoFactory)Mockito.mock(DaoFactory.class);
        MapperFactory mapperFactory = (MapperFactory)Mockito.mock(MapperFactory.class);
        ChangeStreamMetrics metrics = (ChangeStreamMetrics)Mockito.mock(ChangeStreamMetrics.class);
        ThroughputEstimator throughputEstimator = (ThroughputEstimator)Mockito.mock(ThroughputEstimator.class);
        ActionFactory actionFactory = (ActionFactory)Mockito.mock(ActionFactory.class);
        PartitionMetadataDao partitionMetadataDao = (PartitionMetadataDao)Mockito.mock(PartitionMetadataDao.class);
        ChangeStreamDao changeStreamDao = (ChangeStreamDao)Mockito.mock(ChangeStreamDao.class);
        ChangeStreamRecordMapper changeStreamRecordMapper = (ChangeStreamRecordMapper)Mockito.mock(ChangeStreamRecordMapper.class);
        PartitionMetadataMapper partitionMetadataMapper = (PartitionMetadataMapper)Mockito.mock(PartitionMetadataMapper.class);
        this.dataChangeRecordAction = (DataChangeRecordAction)Mockito.mock(DataChangeRecordAction.class);
        this.heartbeatRecordAction = (HeartbeatRecordAction)Mockito.mock(HeartbeatRecordAction.class);
        this.childPartitionsRecordAction = (ChildPartitionsRecordAction)Mockito.mock(ChildPartitionsRecordAction.class);
        this.queryChangeStreamAction = (QueryChangeStreamAction)Mockito.mock(QueryChangeStreamAction.class);
        this.doFn = new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics, throughputEstimator);
        this.partition = PartitionMetadata.newBuilder().setPartitionToken(PARTITION_TOKEN).setParentTokens(Sets.newHashSet((Object[])new String[]{"parentToken"})).setStartTimestamp(PARTITION_START_TIMESTAMP).setEndTimestamp(PARTITION_END_TIMESTAMP).setHeartbeatMillis(30000L).setState(PartitionMetadata.State.SCHEDULED).setWatermark(PARTITION_START_TIMESTAMP).setScheduledAt(Timestamp.now()).build();
        this.restriction = (TimestampRange)Mockito.mock(TimestampRange.class);
        this.tracker = (RestrictionTracker)Mockito.mock(RestrictionTracker.class);
        this.receiver = (DoFn.OutputReceiver)Mockito.mock(DoFn.OutputReceiver.class);
        this.watermarkEstimator = (ManualWatermarkEstimator)Mockito.mock(ManualWatermarkEstimator.class);
        this.bundleFinalizer = (DoFn.BundleFinalizer)Mockito.mock(DoFn.BundleFinalizer.class);
        Mockito.when((Object)((TimestampRange)this.tracker.currentRestriction())).thenReturn((Object)this.restriction);
        Mockito.when((Object)daoFactory.getPartitionMetadataDao()).thenReturn((Object)partitionMetadataDao);
        Mockito.when((Object)daoFactory.getChangeStreamDao()).thenReturn((Object)changeStreamDao);
        Mockito.when((Object)mapperFactory.changeStreamRecordMapper()).thenReturn((Object)changeStreamRecordMapper);
        Mockito.when((Object)mapperFactory.partitionMetadataMapper()).thenReturn((Object)partitionMetadataMapper);
        Mockito.when((Object)actionFactory.dataChangeRecordAction()).thenReturn((Object)this.dataChangeRecordAction);
        Mockito.when((Object)actionFactory.heartbeatRecordAction(metrics)).thenReturn((Object)this.heartbeatRecordAction);
        Mockito.when((Object)actionFactory.childPartitionsRecordAction(partitionMetadataDao, metrics)).thenReturn((Object)this.childPartitionsRecordAction);
        Mockito.when((Object)actionFactory.queryChangeStreamAction(changeStreamDao, partitionMetadataDao, changeStreamRecordMapper, partitionMetadataMapper, this.dataChangeRecordAction, this.heartbeatRecordAction, this.childPartitionsRecordAction, metrics, throughputEstimator)).thenReturn((Object)this.queryChangeStreamAction);
        this.doFn.setup();
    }

    @Test
    public void testQueryChangeStreamMode() {
        Mockito.when((Object)this.queryChangeStreamAction.run((PartitionMetadata)ArgumentMatchers.any(), (RestrictionTracker)ArgumentMatchers.any(), (DoFn.OutputReceiver)ArgumentMatchers.any(), (ManualWatermarkEstimator)ArgumentMatchers.any(), (DoFn.BundleFinalizer)ArgumentMatchers.any())).thenReturn((Object)DoFn.ProcessContinuation.stop());
        DoFn.ProcessContinuation result = this.doFn.processElement(this.partition, this.tracker, this.receiver, this.watermarkEstimator, this.bundleFinalizer);
        Assert.assertEquals((Object)DoFn.ProcessContinuation.stop(), (Object)result);
        ((QueryChangeStreamAction)Mockito.verify((Object)this.queryChangeStreamAction)).run(this.partition, this.tracker, this.receiver, this.watermarkEstimator, this.bundleFinalizer);
        ((DataChangeRecordAction)Mockito.verify((Object)this.dataChangeRecordAction, (VerificationMode)Mockito.never())).run((PartitionMetadata)ArgumentMatchers.any(), (DataChangeRecord)ArgumentMatchers.any(), (RestrictionTracker)ArgumentMatchers.any(), (DoFn.OutputReceiver)ArgumentMatchers.any(), (ManualWatermarkEstimator)ArgumentMatchers.any());
        ((HeartbeatRecordAction)Mockito.verify((Object)this.heartbeatRecordAction, (VerificationMode)Mockito.never())).run((PartitionMetadata)ArgumentMatchers.any(), (HeartbeatRecord)ArgumentMatchers.any(), (RestrictionTracker)ArgumentMatchers.any(), (ManualWatermarkEstimator)ArgumentMatchers.any());
        ((ChildPartitionsRecordAction)Mockito.verify((Object)this.childPartitionsRecordAction, (VerificationMode)Mockito.never())).run((PartitionMetadata)ArgumentMatchers.any(), (ChildPartitionsRecord)ArgumentMatchers.any(), (RestrictionTracker)ArgumentMatchers.any(), (ManualWatermarkEstimator)ArgumentMatchers.any());
        ((RestrictionTracker)Mockito.verify(this.tracker, (VerificationMode)Mockito.never())).tryClaim((Object)((Timestamp)ArgumentMatchers.any()));
    }
}

