package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;

import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.InitialPipelineState;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.class */
public class DetectNewPartitionsActionTest {

    @ClassRule
    public static final BigtableEmulatorRule BIGTABLE_EMULATOR_RULE = BigtableEmulatorRule.create();

    @Mock
    private ChangeStreamMetrics metrics;

    @Mock
    private GenerateInitialPartitionsAction generateInitialPartitionsAction;

    @Mock
    private ResumeFromPreviousPipelineAction resumeFromPreviousPipelineAction;
    private DetectNewPartitionsAction action;

    @Mock
    private RestrictionTracker<OffsetRange, Long> tracker;

    @Mock
    private DoFn.OutputReceiver<PartitionRecord> receiver;
    private MetadataTableDao metadataTableDao;
    private ManualWatermarkEstimator<Instant> watermarkEstimator;
    private Instant startTime;
    private Instant endTime;
    private static BigtableDataClient dataClient;
    private static BigtableTableAdminClient adminClient;

    @Captor
    ArgumentCaptor<PartitionRecord> partitionRecordArgumentCaptor;

    @BeforeClass
    public static void beforeClass() throws IOException {
        adminClient = BigtableTableAdminClient.create(BigtableTableAdminSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort()).setProjectId("fake-project").setInstanceId("fake-instance").build());
        dataClient = BigtableDataClient.create(BigtableDataSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort()).setProjectId("fake-project").setInstanceId("fake-instance").build());
    }

    @Before
    public void setUp() throws Exception {
        MetadataTableAdminDao metadataTableAdminDao = new MetadataTableAdminDao(adminClient, (BigtableInstanceAdminClient) null, UniqueIdGenerator.generateRowKeyPrefix(), "__change_stream_md_table");
        metadataTableAdminDao.createMetadataTable();
        metadataTableAdminDao.cleanUpPrefix();
        this.metadataTableDao = new MetadataTableDao(dataClient, metadataTableAdminDao.getTableId(), metadataTableAdminDao.getChangeStreamNamePrefix());
        this.startTime = Instant.now();
        this.endTime = this.startTime.plus(Duration.standardSeconds(10L));
        this.action = new DetectNewPartitionsAction(this.metrics, this.metadataTableDao, this.endTime, this.generateInitialPartitionsAction, this.resumeFromPreviousPipelineAction, new ProcessNewPartitionsAction(this.metrics, this.metadataTableDao, this.endTime));
        this.watermarkEstimator = new WatermarkEstimators.Manual(this.startTime);
    }

    @Test
    public void testUpdateWatermarkAfterCheckpoint() throws Exception {
        Instant instant = this.endTime;
        OffsetRange offsetRange = new OffsetRange(1L, Long.MAX_VALUE);
        Mockito.when((OffsetRange) this.tracker.currentRestriction()).thenReturn(offsetRange);
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim(Long.valueOf(offsetRange.getFrom())))).thenReturn(true);
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        Assert.assertEquals(this.startTime, this.watermarkEstimator.currentWatermark());
        Assert.assertNull(this.metadataTableDao.readDetectNewPartitionsState());
        this.metadataTableDao.updateDetectNewPartitionWatermark(instant);
        Assert.assertEquals(DoFn.ProcessContinuation.stop(), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        Assert.assertEquals(instant, this.watermarkEstimator.currentWatermark());
        Assert.assertEquals(instant, this.metadataTableDao.readDetectNewPartitionsState().getWatermark());
    }

    @Test
    public void testUpdateWatermarkOnEvenCountAfter10Seconds() throws Exception {
        OffsetRange offsetRange = new OffsetRange(2L, Long.MAX_VALUE);
        Mockito.when((OffsetRange) this.tracker.currentRestriction()).thenReturn(offsetRange);
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim(Long.valueOf(offsetRange.getFrom())))).thenReturn(true);
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        Assert.assertEquals(this.startTime, this.watermarkEstimator.currentWatermark());
        Assert.assertNull(this.metadataTableDao.readDetectNewPartitionsState());
        dataClient.mutateRow(RowMutation.create("__change_stream_md_table", this.metadataTableDao.getChangeStreamNamePrefix().concat(MetadataTableAdminDao.DETECT_NEW_PARTITION_SUFFIX)).setCell("watermark", "latest", Instant.now().minus(Duration.standardSeconds(10L)).getMillis() * 1000, this.startTime.getMillis()));
        Range.ByteStringRange create = Range.ByteStringRange.create("", "");
        Instant plus = this.endTime.plus(Duration.millis(100L));
        this.metadataTableDao.lockAndRecordPartition(new PartitionRecord(create, plus, UniqueIdGenerator.getNextId(), plus, Collections.emptyList(), (Instant) null));
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        Assert.assertEquals(this.startTime, this.watermarkEstimator.currentWatermark());
        Assert.assertEquals(plus, this.metadataTableDao.readDetectNewPartitionsState().getWatermark());
        Assert.assertEquals(DoFn.ProcessContinuation.stop(), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        ((RestrictionTracker) Mockito.verify(this.tracker, Mockito.times(1))).tryClaim(Long.valueOf(offsetRange.getTo()));
        Assert.assertEquals(plus, this.watermarkEstimator.currentWatermark());
    }

    @Test
    public void testDoNotUpdateWatermarkLessThan10s() throws Exception {
        OffsetRange offsetRange = new OffsetRange(2L, Long.MAX_VALUE);
        Mockito.when((OffsetRange) this.tracker.currentRestriction()).thenReturn(offsetRange);
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim(Long.valueOf(offsetRange.getFrom())))).thenReturn(true);
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        Assert.assertEquals(this.startTime, this.watermarkEstimator.currentWatermark());
        Assert.assertNull(this.metadataTableDao.readDetectNewPartitionsState());
        dataClient.mutateRow(RowMutation.create("__change_stream_md_table", this.metadataTableDao.getChangeStreamNamePrefix().concat(MetadataTableAdminDao.DETECT_NEW_PARTITION_SUFFIX)).setCell("watermark", "latest", Instant.now().minus(Duration.standardSeconds(1L)).getMillis() * 1000, this.startTime.getMillis()));
        Range.ByteStringRange create = Range.ByteStringRange.create("", "");
        Instant plus = this.endTime.plus(Duration.millis(100L));
        this.metadataTableDao.lockAndRecordPartition(new PartitionRecord(create, plus, UniqueIdGenerator.getNextId(), plus, Collections.emptyList(), (Instant) null));
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        Assert.assertEquals(this.startTime, this.watermarkEstimator.currentWatermark());
        Assert.assertEquals(this.startTime, this.metadataTableDao.readDetectNewPartitionsState().getWatermark());
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        Assert.assertEquals(this.startTime, this.watermarkEstimator.currentWatermark());
    }

    @Test
    public void testDoNotUpdateWatermarkOnOddCount() throws Exception {
        OffsetRange offsetRange = new OffsetRange(1L, Long.MAX_VALUE);
        Mockito.when((OffsetRange) this.tracker.currentRestriction()).thenReturn(offsetRange);
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim(Long.valueOf(offsetRange.getFrom())))).thenReturn(true);
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        Assert.assertEquals(this.startTime, this.watermarkEstimator.currentWatermark());
        Assert.assertNull(this.metadataTableDao.readDetectNewPartitionsState());
        dataClient.mutateRow(RowMutation.create("__change_stream_md_table", this.metadataTableDao.getChangeStreamNamePrefix().concat(MetadataTableAdminDao.DETECT_NEW_PARTITION_SUFFIX)).setCell("watermark", "latest", Instant.now().minus(Duration.standardSeconds(10L)).getMillis() * 1000, this.startTime.getMillis()));
        Range.ByteStringRange create = Range.ByteStringRange.create("", "");
        Instant plus = this.endTime.plus(Duration.millis(100L));
        this.metadataTableDao.lockAndRecordPartition(new PartitionRecord(create, plus, UniqueIdGenerator.getNextId(), plus, Collections.emptyList(), (Instant) null));
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        Assert.assertEquals(this.startTime, this.watermarkEstimator.currentWatermark());
        Assert.assertEquals(this.startTime, this.metadataTableDao.readDetectNewPartitionsState().getWatermark());
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        Assert.assertEquals(this.startTime, this.watermarkEstimator.currentWatermark());
    }

    @Test
    public void testAdvanceWatermarkWithAllPartitions() throws Exception {
        OffsetRange offsetRange = new OffsetRange(10L, Long.MAX_VALUE);
        Mockito.when((OffsetRange) this.tracker.currentRestriction()).thenReturn(offsetRange);
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim(Long.valueOf(offsetRange.getFrom())))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim(Long.valueOf(offsetRange.getTo())))).thenReturn(true);
        Assert.assertEquals(this.startTime, this.watermarkEstimator.currentWatermark());
        Assert.assertNull(this.metadataTableDao.readDetectNewPartitionsState());
        Range.ByteStringRange create = Range.ByteStringRange.create("", "b");
        Instant plus = this.endTime.plus(Duration.millis(100L));
        this.metadataTableDao.lockAndRecordPartition(new PartitionRecord(create, plus, UniqueIdGenerator.getNextId(), plus, Collections.emptyList(), (Instant) null));
        Range.ByteStringRange create2 = Range.ByteStringRange.create("b", "");
        Instant plus2 = this.endTime.plus(Duration.millis(1L));
        this.metadataTableDao.lockAndRecordPartition(new PartitionRecord(create2, plus2, UniqueIdGenerator.getNextId(), plus2, Collections.emptyList(), (Instant) null));
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        ((RestrictionTracker) Mockito.verify(this.tracker, Mockito.times(1))).tryClaim(Long.valueOf(offsetRange.getFrom()));
        Assert.assertEquals(this.startTime, this.watermarkEstimator.currentWatermark());
        Assert.assertEquals(plus2, this.metadataTableDao.readDetectNewPartitionsState().getWatermark());
        Assert.assertEquals(DoFn.ProcessContinuation.stop(), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        ((RestrictionTracker) Mockito.verify(this.tracker, Mockito.times(1))).tryClaim(Long.valueOf(offsetRange.getTo()));
        Assert.assertEquals(plus2, this.watermarkEstimator.currentWatermark());
    }

    @Test
    public void testAdvanceWatermarkWithMissingPartitions() throws Exception {
        OffsetRange offsetRange = new OffsetRange(2L, Long.MAX_VALUE);
        Mockito.when((OffsetRange) this.tracker.currentRestriction()).thenReturn(offsetRange);
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim(Long.valueOf(offsetRange.getFrom())))).thenReturn(true);
        Assert.assertEquals(this.startTime, this.watermarkEstimator.currentWatermark());
        Assert.assertNull(this.metadataTableDao.readDetectNewPartitionsState());
        this.metadataTableDao.updateWatermark(Range.ByteStringRange.create("", "b"), this.endTime.plus(Duration.millis(100L)), (ChangeStreamContinuationToken) null);
        this.metadataTableDao.updateWatermark(Range.ByteStringRange.create("b", "c"), this.endTime.plus(Duration.millis(1L)), (ChangeStreamContinuationToken) null);
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        ((RestrictionTracker) Mockito.verify(this.tracker, Mockito.times(1))).tryClaim(Long.valueOf(offsetRange.getFrom()));
        Assert.assertEquals(this.startTime, this.watermarkEstimator.currentWatermark());
        Assert.assertNull(this.metadataTableDao.readDetectNewPartitionsState());
    }

    @Test
    public void testAdvanceWatermarkWithNewPartitions() throws Exception {
        OffsetRange offsetRange = new OffsetRange(2L, Long.MAX_VALUE);
        Mockito.when((OffsetRange) this.tracker.currentRestriction()).thenReturn(offsetRange);
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim(Long.valueOf(offsetRange.getFrom())))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim(Long.valueOf(offsetRange.getTo())))).thenReturn(true);
        Assert.assertEquals(this.startTime, this.watermarkEstimator.currentWatermark());
        Assert.assertNull(this.metadataTableDao.readDetectNewPartitionsState());
        Range.ByteStringRange create = Range.ByteStringRange.create("", "b");
        Instant plus = this.endTime.plus(Duration.millis(100L));
        this.metadataTableDao.lockAndRecordPartition(new PartitionRecord(create, plus, UniqueIdGenerator.getNextId(), plus, Collections.emptyList(), (Instant) null));
        Range.ByteStringRange create2 = Range.ByteStringRange.create("b", "c");
        Instant plus2 = this.endTime.plus(Duration.millis(1L));
        this.metadataTableDao.lockAndRecordPartition(new PartitionRecord(create2, plus2, UniqueIdGenerator.getNextId(), plus2, Collections.emptyList(), (Instant) null));
        Range.ByteStringRange create3 = Range.ByteStringRange.create("c", "");
        this.metadataTableDao.writeNewPartition(new NewPartition(create3, Collections.singletonList(ChangeStreamContinuationToken.create(create3, "token")), plus2));
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        ((RestrictionTracker) Mockito.verify(this.tracker, Mockito.times(1))).tryClaim(Long.valueOf(offsetRange.getFrom()));
        Assert.assertEquals(this.startTime, this.watermarkEstimator.currentWatermark());
        Assert.assertEquals(plus2, this.metadataTableDao.readDetectNewPartitionsState().getWatermark());
        Assert.assertEquals(DoFn.ProcessContinuation.stop(), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        ((RestrictionTracker) Mockito.verify(this.tracker, Mockito.times(1))).tryClaim(Long.valueOf(offsetRange.getTo()));
        Assert.assertEquals(plus2, this.watermarkEstimator.currentWatermark());
    }

    @Test
    public void testProcessSplitNewPartitions() throws Exception {
        OffsetRange offsetRange = new OffsetRange(1L, Long.MAX_VALUE);
        Mockito.when((OffsetRange) this.tracker.currentRestriction()).thenReturn(offsetRange);
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim(Long.valueOf(offsetRange.getFrom())))).thenReturn(true);
        Instant now = Instant.now();
        Range.ByteStringRange create = Range.ByteStringRange.create("", "a");
        this.metadataTableDao.writeNewPartition(new NewPartition(create, Collections.singletonList(ChangeStreamContinuationToken.create(create, Range.ByteStringRange.serializeToByteString(create).toStringUtf8())), now));
        Range.ByteStringRange create2 = Range.ByteStringRange.create("k", "");
        this.metadataTableDao.writeNewPartition(new NewPartition(create2, Collections.singletonList(ChangeStreamContinuationToken.create(create2, Range.ByteStringRange.serializeToByteString(create2).toStringUtf8())), now));
        Range.ByteStringRange create3 = Range.ByteStringRange.create("a", "k");
        this.metadataTableDao.writeNewPartition(new NewPartition(create3, Collections.singletonList(ChangeStreamContinuationToken.create(create3, Range.ByteStringRange.serializeToByteString(create3).toStringUtf8())), now));
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        ((DoFn.OutputReceiver) Mockito.verify(this.receiver, Mockito.times(3))).outputWithTimestamp((PartitionRecord) this.partitionRecordArgumentCaptor.capture(), (Instant) ArgumentMatchers.eq(Instant.EPOCH));
        MatcherAssert.assertThat((List) this.partitionRecordArgumentCaptor.getAllValues().stream().map((v0) -> {
            return v0.getPartition();
        }).collect(Collectors.toList()), Matchers.containsInAnyOrder(Arrays.asList(create, create2, create3).toArray()));
        for (PartitionRecord partitionRecord : this.partitionRecordArgumentCaptor.getAllValues()) {
            Assert.assertEquals(partitionRecord.getParentLowWatermark(), now);
            Assert.assertNotNull(partitionRecord.getEndTime());
            Assert.assertEquals(partitionRecord.getEndTime(), this.endTime);
            Assert.assertNotNull(partitionRecord.getChangeStreamContinuationTokens());
            Assert.assertEquals(1L, partitionRecord.getChangeStreamContinuationTokens().size());
            Assert.assertEquals(((ChangeStreamContinuationToken) partitionRecord.getChangeStreamContinuationTokens().get(0)).getToken(), Range.ByteStringRange.serializeToByteString(((ChangeStreamContinuationToken) partitionRecord.getChangeStreamContinuationTokens().get(0)).getPartition()).toStringUtf8());
        }
        Assert.assertTrue(this.metadataTableDao.readNewPartitions().isEmpty());
    }

    @Test
    public void testProcessMergeNewPartitions() throws Exception {
        OffsetRange offsetRange = new OffsetRange(1L, Long.MAX_VALUE);
        Mockito.when((OffsetRange) this.tracker.currentRestriction()).thenReturn(offsetRange);
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim(Long.valueOf(offsetRange.getFrom())))).thenReturn(true);
        Range.ByteStringRange create = Range.ByteStringRange.create("a", "c");
        Range.ByteStringRange create2 = Range.ByteStringRange.create("a", "b");
        Instant instant = this.startTime;
        ChangeStreamContinuationToken create3 = ChangeStreamContinuationToken.create(create2, "ab");
        NewPartition newPartition = new NewPartition(create, Collections.singletonList(create3), instant);
        Range.ByteStringRange create4 = Range.ByteStringRange.create("b", "c");
        Instant plus = this.startTime.plus(Duration.millis(10L));
        ChangeStreamContinuationToken create5 = ChangeStreamContinuationToken.create(create4, "bc");
        NewPartition newPartition2 = new NewPartition(create, Collections.singletonList(create5), plus);
        this.metadataTableDao.writeNewPartition(newPartition);
        this.metadataTableDao.writeNewPartition(newPartition2);
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        ((DoFn.OutputReceiver) Mockito.verify(this.receiver, Mockito.times(1))).outputWithTimestamp((PartitionRecord) this.partitionRecordArgumentCaptor.capture(), (Instant) ArgumentMatchers.eq(Instant.EPOCH));
        Assert.assertEquals(create, ((PartitionRecord) this.partitionRecordArgumentCaptor.getValue()).getPartition());
        Assert.assertEquals(instant, ((PartitionRecord) this.partitionRecordArgumentCaptor.getValue()).getParentLowWatermark());
        Assert.assertEquals(this.endTime, ((PartitionRecord) this.partitionRecordArgumentCaptor.getValue()).getEndTime());
        MatcherAssert.assertThat(((PartitionRecord) this.partitionRecordArgumentCaptor.getValue()).getChangeStreamContinuationTokens(), Matchers.containsInAnyOrder(new ChangeStreamContinuationToken[]{create3, create5}));
        Assert.assertTrue(this.metadataTableDao.readNewPartitions().isEmpty());
    }

    @Test
    public void testProcessMergeNewPartitionsMissingParent() throws Exception {
        OffsetRange offsetRange = new OffsetRange(1L, Long.MAX_VALUE);
        Mockito.when((OffsetRange) this.tracker.currentRestriction()).thenReturn(offsetRange);
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim(Long.valueOf(offsetRange.getFrom())))).thenReturn(true);
        Range.ByteStringRange create = Range.ByteStringRange.create("a", "b");
        Instant instant = this.startTime;
        ChangeStreamContinuationToken create2 = ChangeStreamContinuationToken.create(create, "ab");
        Range.ByteStringRange create3 = Range.ByteStringRange.create("a", "c");
        this.metadataTableDao.writeNewPartition(new NewPartition(create3, Collections.singletonList(create2), instant));
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        ((DoFn.OutputReceiver) Mockito.verify(this.receiver, Mockito.never())).outputWithTimestamp((PartitionRecord) ArgumentMatchers.any(), (Instant) ArgumentMatchers.any());
        Assert.assertEquals(1L, this.metadataTableDao.readNewPartitions().size());
        Range.ByteStringRange create4 = Range.ByteStringRange.create("b", "c");
        Instant plus = this.startTime.plus(Duration.millis(10L));
        ChangeStreamContinuationToken create5 = ChangeStreamContinuationToken.create(create4, "bc");
        this.metadataTableDao.writeNewPartition(new NewPartition(create3, Collections.singletonList(create5), plus));
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        ((DoFn.OutputReceiver) Mockito.verify(this.receiver, Mockito.times(1))).outputWithTimestamp((PartitionRecord) this.partitionRecordArgumentCaptor.capture(), (Instant) ArgumentMatchers.eq(Instant.EPOCH));
        Assert.assertEquals(create3, ((PartitionRecord) this.partitionRecordArgumentCaptor.getValue()).getPartition());
        Assert.assertEquals(instant, ((PartitionRecord) this.partitionRecordArgumentCaptor.getValue()).getParentLowWatermark());
        Assert.assertEquals(this.endTime, ((PartitionRecord) this.partitionRecordArgumentCaptor.getValue()).getEndTime());
        MatcherAssert.assertThat(((PartitionRecord) this.partitionRecordArgumentCaptor.getValue()).getChangeStreamContinuationTokens(), Matchers.containsInAnyOrder(new ChangeStreamContinuationToken[]{create2, create5}));
        Assert.assertTrue(this.metadataTableDao.readNewPartitions().isEmpty());
    }

    @Test
    public void testMissingPartitionReconciled() throws Exception {
        OffsetRange offsetRange = new OffsetRange(52L, Long.MAX_VALUE);
        Mockito.when((OffsetRange) this.tracker.currentRestriction()).thenReturn(offsetRange);
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim(Long.valueOf(offsetRange.getFrom())))).thenReturn(true);
        Range.ByteStringRange create = Range.ByteStringRange.create("", "a");
        Instant plus = this.endTime.plus(Duration.millis(100L));
        this.metadataTableDao.lockAndRecordPartition(new PartitionRecord(create, plus, UniqueIdGenerator.getNextId(), plus, Collections.emptyList(), (Instant) null));
        Range.ByteStringRange create2 = Range.ByteStringRange.create("b", "");
        Instant plus2 = this.endTime.plus(Duration.millis(1L));
        this.metadataTableDao.lockAndRecordPartition(new PartitionRecord(create2, plus2, UniqueIdGenerator.getNextId(), plus2, Collections.emptyList(), (Instant) null));
        HashMap hashMap = new HashMap();
        Range.ByteStringRange create3 = Range.ByteStringRange.create("a", "b");
        hashMap.put(create3, Instant.now().minus(Duration.standardSeconds(1199L)));
        this.metadataTableDao.writeDetectNewPartitionMissingPartitions(hashMap);
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        ((DoFn.OutputReceiver) Mockito.verify(this.receiver, Mockito.never())).outputWithTimestamp((PartitionRecord) ArgumentMatchers.any(), (Instant) ArgumentMatchers.any());
        Assert.assertEquals(1L, this.metadataTableDao.readDetectNewPartitionMissingPartitions().size());
        Thread.sleep(1001L);
        OffsetRange offsetRange2 = new OffsetRange(53L, Long.MAX_VALUE);
        Mockito.when((OffsetRange) this.tracker.currentRestriction()).thenReturn(offsetRange2);
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim(Long.valueOf(offsetRange2.getFrom())))).thenReturn(true);
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        ((DoFn.OutputReceiver) Mockito.verify(this.receiver, Mockito.never())).outputWithTimestamp((PartitionRecord) ArgumentMatchers.any(), (Instant) ArgumentMatchers.any());
        Assert.assertEquals(1L, this.metadataTableDao.readDetectNewPartitionMissingPartitions().size());
        OffsetRange offsetRange3 = new OffsetRange(54L, Long.MAX_VALUE);
        Mockito.when((OffsetRange) this.tracker.currentRestriction()).thenReturn(offsetRange3);
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim(Long.valueOf(offsetRange3.getFrom())))).thenReturn(true);
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        Assert.assertEquals(0L, this.metadataTableDao.readDetectNewPartitionMissingPartitions().size());
        ((DoFn.OutputReceiver) Mockito.verify(this.receiver, Mockito.times(1))).outputWithTimestamp((PartitionRecord) this.partitionRecordArgumentCaptor.capture(), (Instant) ArgumentMatchers.eq(Instant.EPOCH));
        Assert.assertEquals(create3, ((PartitionRecord) this.partitionRecordArgumentCaptor.getValue()).getPartition());
        Assert.assertEquals(this.watermarkEstimator.currentWatermark(), ((PartitionRecord) this.partitionRecordArgumentCaptor.getValue()).getParentLowWatermark());
        Assert.assertEquals(this.endTime, ((PartitionRecord) this.partitionRecordArgumentCaptor.getValue()).getEndTime());
        Assert.assertNotNull(((PartitionRecord) this.partitionRecordArgumentCaptor.getValue()).getStartTime());
        Assert.assertEquals(this.startTime, ((PartitionRecord) this.partitionRecordArgumentCaptor.getValue()).getStartTime());
    }

    @Test
    public void testBackToBackReconcile() throws Exception {
        OffsetRange offsetRange = new OffsetRange(52L, Long.MAX_VALUE);
        Mockito.when((OffsetRange) this.tracker.currentRestriction()).thenReturn(offsetRange);
        Mockito.when(Boolean.valueOf(this.tracker.tryClaim(Long.valueOf(offsetRange.getFrom())))).thenReturn(true);
        Range.ByteStringRange create = Range.ByteStringRange.create("", "a");
        Instant plus = this.endTime.plus(Duration.millis(100L));
        this.metadataTableDao.lockAndRecordPartition(new PartitionRecord(create, plus, UniqueIdGenerator.getNextId(), plus, Collections.emptyList(), (Instant) null));
        Range.ByteStringRange create2 = Range.ByteStringRange.create("b", "");
        Instant plus2 = this.endTime.plus(Duration.millis(1L));
        this.metadataTableDao.lockAndRecordPartition(new PartitionRecord(create2, plus2, UniqueIdGenerator.getNextId(), plus2, Collections.emptyList(), (Instant) null));
        Range.ByteStringRange create3 = Range.ByteStringRange.create("a", "b");
        Instant instant = this.startTime;
        ChangeStreamContinuationToken create4 = ChangeStreamContinuationToken.create(create3, "ab");
        this.metadataTableDao.writeNewPartition(new NewPartition(Range.ByteStringRange.create("a", "c"), Collections.singletonList(create4), instant));
        HashMap hashMap = new HashMap();
        hashMap.put(create3, Instant.now().minus(Duration.standardSeconds(121L)));
        this.metadataTableDao.writeDetectNewPartitionMissingPartitions(hashMap);
        Assert.assertEquals(1L, this.metadataTableDao.readNewPartitions().size());
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        ((DoFn.OutputReceiver) Mockito.verify(this.receiver, Mockito.times(1))).outputWithTimestamp((PartitionRecord) this.partitionRecordArgumentCaptor.capture(), (Instant) ArgumentMatchers.eq(Instant.EPOCH));
        Assert.assertEquals(create3, ((PartitionRecord) this.partitionRecordArgumentCaptor.getValue()).getPartition());
        Assert.assertEquals(instant, ((PartitionRecord) this.partitionRecordArgumentCaptor.getValue()).getParentLowWatermark());
        Assert.assertEquals(this.endTime, ((PartitionRecord) this.partitionRecordArgumentCaptor.getValue()).getEndTime());
        Assert.assertEquals(((PartitionRecord) this.partitionRecordArgumentCaptor.getValue()).getChangeStreamContinuationTokens(), Collections.singletonList(create4));
        Assert.assertTrue(this.metadataTableDao.readNewPartitions().isEmpty());
        Assert.assertTrue(this.metadataTableDao.readDetectNewPartitionMissingPartitions().isEmpty());
        Mockito.clearInvocations(new DoFn.OutputReceiver[]{this.receiver});
        Assert.assertEquals(DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L)), this.action.run(this.tracker, this.receiver, this.watermarkEstimator, new InitialPipelineState(this.startTime, false)));
        ((DoFn.OutputReceiver) Mockito.verify(this.receiver, Mockito.never())).outputWithTimestamp((PartitionRecord) ArgumentMatchers.any(), (Instant) ArgumentMatchers.any());
    }
}
