package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;

import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowCell;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.MetadataTableEncoder;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.StreamPartitionWithWatermark;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIOReadTest;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.class */
public class MetadataTableDaoTest {
    private static final Logger LOG = LoggerFactory.getLogger(MetadataTableDaoTest.class);

    @ClassRule
    public static final BigtableEmulatorRule BIGTABLE_EMULATOR_RULE = BigtableEmulatorRule.create();
    private static MetadataTableDao metadataTableDao;
    private static MetadataTableAdminDao metadataTableAdminDao;
    private static BigtableDataClient dataClient;
    private static BigtableTableAdminClient adminClient;

    /* renamed from: org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDaoTest$1LockPartition, reason: invalid class name */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest$1LockPartition.class */
    class C1LockPartition implements Runnable {
        final PartitionRecord partitionRecord;
        boolean locked = false;
        final /* synthetic */ Range.ByteStringRange val$partition;

        C1LockPartition(String str, Range.ByteStringRange byteStringRange) {
            this.val$partition = byteStringRange;
            this.partitionRecord = new PartitionRecord(this.val$partition, Collections.emptyList(), Instant.now(), Collections.emptyList());
            this.partitionRecord.setUuid(str);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Thread.sleep((int) (Math.random() * 1000.0d));
                if (MetadataTableDaoTest.metadataTableDao.lockAndRecordPartition(this.partitionRecord)) {
                    this.locked = true;
                }
            } catch (InterruptedException e) {
                MetadataTableDaoTest.LOG.error(e.toString());
            }
        }
    }

    @BeforeClass
    public static void beforeClass() throws IOException {
        adminClient = BigtableTableAdminClient.create(BigtableTableAdminSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort()).setProjectId("fake-project").setInstanceId("fake-instance").build());
        dataClient = BigtableDataClient.create(BigtableDataSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort()).setProjectId("fake-project").setInstanceId("fake-instance").build());
    }

    @Before
    public void before() {
        metadataTableAdminDao = new MetadataTableAdminDao(adminClient, (BigtableInstanceAdminClient) null, UniqueIdGenerator.generateRowKeyPrefix(), "__change_stream_md_table");
        metadataTableAdminDao.createMetadataTable();
        metadataTableDao = new MetadataTableDao(dataClient, metadataTableAdminDao.getTableId(), metadataTableAdminDao.getChangeStreamNamePrefix());
    }

    @Test
    public void testStreamPartitionRowKeyConversion() throws InvalidProtocolBufferException {
        Range.ByteStringRange create = Range.ByteStringRange.create("a", "b");
        Assert.assertEquals(create, metadataTableDao.convertStreamPartitionRowKeyToPartition(metadataTableDao.convertPartitionToStreamPartitionRowKey(create)));
    }

    @Test
    public void testStreamPartitionRowKeyConversionWithIllegalUtf8() throws InvalidProtocolBufferException {
        ByteString copyFrom = ByteString.copyFrom(new byte[]{-116});
        Range.ByteStringRange create = Range.ByteStringRange.create(copyFrom, copyFrom);
        Assert.assertEquals(create, metadataTableDao.convertStreamPartitionRowKeyToPartition(metadataTableDao.convertPartitionToStreamPartitionRowKey(create)));
    }

    @Test
    public void testNewPartitionRowKeyConversion() throws InvalidProtocolBufferException {
        Range.ByteStringRange create = Range.ByteStringRange.create("a", "b");
        Assert.assertEquals(create, metadataTableDao.convertNewPartitionRowKeyToPartition(metadataTableDao.convertPartitionToNewPartitionRowKey(create)));
    }

    @Test
    public void testNewPartitionConversionWithWithIllegalUtf8() throws InvalidProtocolBufferException {
        ByteString copyFrom = ByteString.copyFrom(new byte[]{-116});
        Range.ByteStringRange create = Range.ByteStringRange.create(copyFrom, copyFrom);
        Assert.assertEquals(create, metadataTableDao.convertNewPartitionRowKeyToPartition(metadataTableDao.convertPartitionToNewPartitionRowKey(create)));
    }

    @Test
    public void testLockPartitionRace() throws InterruptedException {
        Range.ByteStringRange create = Range.ByteStringRange.create("", "");
        ByteString convertPartitionToStreamPartitionRowKey = metadataTableDao.convertPartitionToStreamPartitionRowKey(create);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < 1000; i++) {
            arrayList.add(new C1LockPartition(Integer.toString(i), create));
            arrayList2.add(new Thread((Runnable) arrayList.get(i)));
        }
        for (int i2 = 0; i2 < 1000; i2++) {
            ((Thread) arrayList2.get(i2)).start();
        }
        for (int i3 = 0; i3 < 1000; i3++) {
            ((Thread) arrayList2.get(i3)).join();
        }
        int i4 = -1;
        for (int i5 = 0; i5 < 1000; i5++) {
            if (((C1LockPartition) arrayList.get(i5)).locked) {
                if (i4 == -1) {
                    i4 = i5;
                } else {
                    Assert.fail("Multiple owner on the lock. Both " + i4 + " and " + i5 + " (and possibly more) think they hold the lock.");
                }
            }
        }
        Row readRow = dataClient.readRow(metadataTableAdminDao.getTableId(), convertPartitionToStreamPartitionRowKey, Filters.FILTERS.chain().filter(Filters.FILTERS.family().exactMatch("lock")).filter(Filters.FILTERS.qualifier().exactMatch("latest")));
        Assert.assertEquals(1L, readRow.getCells().size());
        Assert.assertEquals(Integer.toString(i4), ((RowCell) readRow.getCells().get(0)).getValue().toStringUtf8());
        dataClient.mutateRow(RowMutation.create(metadataTableAdminDao.getTableId(), convertPartitionToStreamPartitionRowKey).deleteRow());
    }

    @Test
    public void testReadStreamPartitionsWithWatermark() throws InvalidProtocolBufferException {
        Range.ByteStringRange create = Range.ByteStringRange.create("", "a");
        PartitionRecord partitionRecord = new PartitionRecord(create, Instant.now(), UniqueIdGenerator.getNextId(), Instant.now(), Collections.emptyList(), (Instant) null);
        metadataTableDao.lockAndRecordPartition(partitionRecord);
        List readStreamPartitionsWithWatermark = metadataTableDao.readStreamPartitionsWithWatermark();
        Assert.assertEquals(1L, readStreamPartitionsWithWatermark.size());
        Assert.assertEquals(partitionRecord.getParentLowWatermark(), ((StreamPartitionWithWatermark) readStreamPartitionsWithWatermark.get(0)).getWatermark());
        Assert.assertEquals(partitionRecord.getPartition(), ((StreamPartitionWithWatermark) readStreamPartitionsWithWatermark.get(0)).getPartition());
        Instant now = Instant.now();
        metadataTableDao.updateWatermark(create, now, (ChangeStreamContinuationToken) null);
        List readStreamPartitionsWithWatermark2 = metadataTableDao.readStreamPartitionsWithWatermark();
        Assert.assertEquals(1L, readStreamPartitionsWithWatermark2.size());
        Assert.assertEquals(now, ((StreamPartitionWithWatermark) readStreamPartitionsWithWatermark2.get(0)).getWatermark());
    }

    @Test
    public void testNewPartitionsWriteRead() throws InvalidProtocolBufferException {
        Instant now = Instant.now();
        Range.ByteStringRange create = Range.ByteStringRange.create("", "a");
        NewPartition newPartition = new NewPartition(create, Collections.singletonList(ChangeStreamContinuationToken.create(create, "EmptyA")), now);
        Range.ByteStringRange create2 = Range.ByteStringRange.create("a", "");
        NewPartition newPartition2 = new NewPartition(create2, Collections.singletonList(ChangeStreamContinuationToken.create(create2, "AEmpty")), now);
        metadataTableDao.writeNewPartition(newPartition);
        metadataTableDao.writeNewPartition(newPartition2);
        int i = 0;
        boolean z = false;
        boolean z2 = false;
        for (NewPartition newPartition3 : metadataTableDao.readNewPartitions()) {
            i++;
            if (newPartition3.getPartition().equals(create)) {
                z = true;
            } else if (newPartition3.getPartition().equals(create2)) {
                z2 = true;
            }
        }
        Assert.assertTrue(z);
        Assert.assertTrue(z2);
        Assert.assertEquals(2L, i);
    }

    @Test
    public void testMarkNewPartitionForDeletion() throws InvalidProtocolBufferException {
        Range.ByteStringRange create = Range.ByteStringRange.create("A", "C");
        metadataTableDao.writeNewPartition(new NewPartition(create, Collections.singletonList(ChangeStreamContinuationToken.create(Range.ByteStringRange.create("B", "C"), "BC")), Instant.now()));
        List readNewPartitions = metadataTableDao.readNewPartitions();
        Assert.assertEquals(1L, readNewPartitions.size());
        Assert.assertEquals(create, ((NewPartition) readNewPartitions.get(0)).getPartition());
        metadataTableDao.markNewPartitionForDeletion((NewPartition) readNewPartitions.get(0));
        Assert.assertEquals(0L, metadataTableDao.readNewPartitions().size());
    }

    @Test
    public void testMarkNewPartitionForDeletionVisibleAfterOneMinute() throws InvalidProtocolBufferException {
        Range.ByteStringRange create = Range.ByteStringRange.create("A", "C");
        Range.ByteStringRange create2 = Range.ByteStringRange.create("B", "C");
        metadataTableDao.writeNewPartition(new NewPartition(create, Collections.singletonList(ChangeStreamContinuationToken.create(create2, "BC")), Instant.now()));
        List readNewPartitions = metadataTableDao.readNewPartitions();
        Assert.assertEquals(1L, readNewPartitions.size());
        Assert.assertEquals(create, ((NewPartition) readNewPartitions.get(0)).getPartition());
        dataClient.mutateRow(RowMutation.create(metadataTableAdminDao.getTableId(), metadataTableDao.convertPartitionToNewPartitionRowKey(create)).setCell("should_delete", Range.ByteStringRange.serializeToByteString(create2), Instant.now().minus(Duration.standardSeconds(61L)).getMillis() * 1000, 1L));
        List readNewPartitions2 = metadataTableDao.readNewPartitions();
        Assert.assertEquals(1L, readNewPartitions2.size());
        Assert.assertEquals(create, ((NewPartition) readNewPartitions2.get(0)).getPartition());
    }

    @Test
    public void testDeleteNewPartition() throws InvalidProtocolBufferException {
        Range.ByteStringRange create = Range.ByteStringRange.create("A", "C");
        metadataTableDao.writeNewPartition(new NewPartition(create, Collections.singletonList(ChangeStreamContinuationToken.create(Range.ByteStringRange.create("B", "C"), "BC")), Instant.now()));
        metadataTableDao.writeNewPartition(new NewPartition(create, Collections.singletonList(ChangeStreamContinuationToken.create(Range.ByteStringRange.create("A", "B"), "AB")), Instant.now()));
        List readNewPartitions = metadataTableDao.readNewPartitions();
        Assert.assertEquals(1L, readNewPartitions.size());
        Assert.assertEquals(create, ((NewPartition) readNewPartitions.get(0)).getPartition());
        Assert.assertFalse(metadataTableDao.deleteNewPartition((NewPartition) readNewPartitions.get(0)));
        Assert.assertEquals(1L, metadataTableDao.readNewPartitions().size());
        metadataTableDao.markNewPartitionForDeletion((NewPartition) readNewPartitions.get(0));
        Assert.assertTrue(metadataTableDao.deleteNewPartition((NewPartition) readNewPartitions.get(0)));
        Assert.assertEquals(0L, metadataTableDao.readNewPartitions().size());
    }

    @Test
    public void testPartialDeleteNewPartition() throws InvalidProtocolBufferException {
        Range.ByteStringRange create = Range.ByteStringRange.create("A", "C");
        NewPartition newPartition = new NewPartition(create, Collections.singletonList(ChangeStreamContinuationToken.create(Range.ByteStringRange.create("B", "C"), "BC")), Instant.now());
        metadataTableDao.writeNewPartition(newPartition);
        ChangeStreamContinuationToken create2 = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("A", "B"), "AB");
        NewPartition newPartition2 = new NewPartition(create, Collections.singletonList(create2), Instant.now());
        metadataTableDao.writeNewPartition(newPartition2);
        List readNewPartitions = metadataTableDao.readNewPartitions();
        Assert.assertEquals(1L, readNewPartitions.size());
        Assert.assertEquals(create, ((NewPartition) readNewPartitions.get(0)).getPartition());
        Assert.assertEquals(2L, ((NewPartition) readNewPartitions.get(0)).getChangeStreamContinuationTokens().size());
        metadataTableDao.markNewPartitionForDeletion(newPartition);
        Assert.assertFalse(metadataTableDao.deleteNewPartition((NewPartition) readNewPartitions.get(0)));
        List readNewPartitions2 = metadataTableDao.readNewPartitions();
        Assert.assertEquals(1L, readNewPartitions2.size());
        Assert.assertEquals(create, ((NewPartition) readNewPartitions2.get(0)).getPartition());
        Assert.assertEquals(1L, ((NewPartition) readNewPartitions2.get(0)).getChangeStreamContinuationTokens().size());
        Assert.assertEquals(create2, ((NewPartition) readNewPartitions2.get(0)).getChangeStreamContinuationTokens().get(0));
        Assert.assertFalse(metadataTableDao.deleteNewPartition(newPartition2));
        List readNewPartitions3 = metadataTableDao.readNewPartitions();
        Assert.assertEquals(1L, readNewPartitions3.size());
        Assert.assertEquals(create, ((NewPartition) readNewPartitions3.get(0)).getPartition());
        Assert.assertEquals(1L, ((NewPartition) readNewPartitions3.get(0)).getChangeStreamContinuationTokens().size());
        Assert.assertEquals(create2, ((NewPartition) readNewPartitions3.get(0)).getChangeStreamContinuationTokens().get(0));
        metadataTableDao.markNewPartitionForDeletion(newPartition2);
        Assert.assertTrue(metadataTableDao.deleteNewPartition(newPartition2));
        Assert.assertEquals(0L, metadataTableDao.readNewPartitions().size());
    }

    @Test
    public void testUpdateAndReadWatermark() throws InvalidProtocolBufferException {
        Range.ByteStringRange create = Range.ByteStringRange.create("a", "b");
        Instant now = Instant.now();
        metadataTableDao.lockAndRecordPartition(new PartitionRecord(create, Instant.now(), UniqueIdGenerator.getNextId(), Instant.now(), Collections.emptyList(), (Instant) null));
        metadataTableDao.updateWatermark(create, now, (ChangeStreamContinuationToken) null);
        Range.ByteStringRange create2 = Range.ByteStringRange.create("b", "c");
        Instant now2 = Instant.now();
        metadataTableDao.lockAndRecordPartition(new PartitionRecord(create2, Instant.now(), UniqueIdGenerator.getNextId(), Instant.now(), Collections.emptyList(), (Instant) null));
        metadataTableDao.updateWatermark(create2, now2, (ChangeStreamContinuationToken) null);
        MatcherAssert.assertThat(metadataTableDao.readStreamPartitionsWithWatermark(), Matchers.containsInAnyOrder(new StreamPartitionWithWatermark[]{new StreamPartitionWithWatermark(create, now), new StreamPartitionWithWatermark(create2, now2)}));
    }

    @Test
    public void testReadingExistingStreamPartitions() throws InvalidProtocolBufferException {
        Range.ByteStringRange create = Range.ByteStringRange.create("A", "B");
        Instant now = Instant.now();
        PartitionRecord partitionRecord = new PartitionRecord(create, now, "1", now, Collections.emptyList(), (Instant) null);
        metadataTableDao.lockAndRecordPartition(partitionRecord);
        Range.ByteStringRange create2 = Range.ByteStringRange.create("B", "D");
        ChangeStreamContinuationToken create3 = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("B", "C"), "tokenBC");
        ChangeStreamContinuationToken create4 = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("C", "D"), "tokenCD");
        Instant now2 = Instant.now();
        PartitionRecord partitionRecord2 = new PartitionRecord(create2, Arrays.asList(create3, create4), "2", now2, Collections.emptyList(), (Instant) null);
        metadataTableDao.lockAndRecordPartition(partitionRecord2);
        MatcherAssert.assertThat(metadataTableDao.readAllStreamPartitions(), Matchers.containsInAnyOrder(new PartitionRecord[]{partitionRecord, partitionRecord2}));
        Instant plus = now2.plus(Duration.standardSeconds(10L));
        metadataTableDao.updateWatermark(create2, plus, (ChangeStreamContinuationToken) null);
        PartitionRecord partitionRecord3 = new PartitionRecord(create2, Arrays.asList(create3, create4), "2", plus, Collections.emptyList(), (Instant) null);
        MatcherAssert.assertThat(metadataTableDao.readAllStreamPartitions(), Matchers.containsInAnyOrder(new PartitionRecord[]{partitionRecord, partitionRecord3}));
        Instant plus2 = now.plus(Duration.standardMinutes(1L));
        ChangeStreamContinuationToken create5 = ChangeStreamContinuationToken.create(create, "tokenAB");
        metadataTableDao.updateWatermark(create, plus2, create5);
        PartitionRecord partitionRecord4 = new PartitionRecord(create, Collections.singletonList(create5), "1", plus2, Collections.emptyList(), (Instant) null);
        MatcherAssert.assertThat(metadataTableDao.readAllStreamPartitions(), Matchers.containsInAnyOrder(new PartitionRecord[]{partitionRecord4, partitionRecord3}));
        metadataTableDao.releaseStreamPartitionLockForDeletion(create2, partitionRecord2.getUuid());
        MatcherAssert.assertThat(metadataTableDao.readAllStreamPartitions(), Matchers.containsInAnyOrder(new PartitionRecord[]{partitionRecord4, new PartitionRecord(create2, Arrays.asList(create3, create4), plus, Collections.emptyList())}));
        metadataTableDao.deleteStreamPartitionRow(create2);
        MatcherAssert.assertThat(metadataTableDao.readAllStreamPartitions(), Matchers.containsInAnyOrder(new PartitionRecord[]{partitionRecord4}));
    }

    @Test
    public void testUpdateDetectNewPartitionWatermark() {
        Instant now = Instant.now();
        metadataTableDao.updateDetectNewPartitionWatermark(now);
        Row readRow = dataClient.readRow(metadataTableAdminDao.getTableId(), metadataTableDao.getChangeStreamNamePrefix().concat(MetadataTableAdminDao.DETECT_NEW_PARTITION_SUFFIX));
        Assert.assertNull(MetadataTableEncoder.parseTokenFromRow(readRow));
        Assert.assertEquals(now, MetadataTableEncoder.parseWatermarkFromRow(readRow));
    }

    @Test
    public void testReadWriteLowWatermark() {
        Instant now = Instant.now();
        metadataTableDao.updateDetectNewPartitionWatermark(now);
        Assert.assertEquals(now, metadataTableDao.readDetectNewPartitionsState().getWatermark());
        Instant plus = now.plus(Duration.standardMinutes(10L));
        metadataTableDao.updateDetectNewPartitionWatermark(plus);
        Assert.assertEquals(plus, metadataTableDao.readDetectNewPartitionsState().getWatermark());
    }

    @Test
    public void testUpdateWatermark() {
        Range.ByteStringRange create = Range.ByteStringRange.create("a", "b");
        Instant now = Instant.now();
        ChangeStreamContinuationToken create2 = ChangeStreamContinuationToken.create(create, SpannerIOReadTest.PROJECT_ID);
        metadataTableDao.updateWatermark(create, now, create2);
        Row readRow = dataClient.readRow(metadataTableAdminDao.getTableId(), metadataTableDao.convertPartitionToStreamPartitionRowKey(create));
        Assert.assertEquals(create2.getToken(), MetadataTableEncoder.parseTokenFromRow(readRow));
        Assert.assertEquals(now, MetadataTableEncoder.parseWatermarkFromRow(readRow));
    }

    @Test
    public void readAndWriteValidMissingPartitionsDuration() {
        HashMap hashMap = new HashMap();
        hashMap.put(Range.ByteStringRange.create("A", "B"), Instant.ofEpochMilli(100L));
        metadataTableDao.writeDetectNewPartitionMissingPartitions(hashMap);
        Assert.assertEquals(hashMap, metadataTableDao.readDetectNewPartitionMissingPartitions());
    }

    @Test
    public void readAndWriteInvalidMissingPartitionsDuration() {
        HashMap hashMap = new HashMap();
        dataClient.mutateRow(RowMutation.create(metadataTableAdminDao.getTableId(), metadataTableAdminDao.getChangeStreamNamePrefix().concat(MetadataTableAdminDao.DETECT_NEW_PARTITION_SUFFIX)).setCell("missing_partitions", ByteString.copyFromUtf8("latest"), ByteString.copyFromUtf8("Invalid serialization")));
        Assert.assertEquals(hashMap, metadataTableDao.readDetectNewPartitionMissingPartitions());
    }

    @Test
    public void readMissingPartitionsWithoutDNPRow() {
        Assert.assertEquals(new HashMap(), metadataTableDao.readDetectNewPartitionMissingPartitions());
    }

    @Test
    public void readMissingPartitionsWithoutMissingPartitionsCell() {
        HashMap hashMap = new HashMap();
        metadataTableDao.updateDetectNewPartitionWatermark(Instant.now());
        Assert.assertEquals(hashMap, metadataTableDao.readDetectNewPartitionMissingPartitions());
    }

    @Test
    public void testReleaseStreamPartitionLock() throws InvalidProtocolBufferException {
        Range.ByteStringRange create = Range.ByteStringRange.create("A", "B");
        metadataTableDao.lockAndRecordPartition(new PartitionRecord(create, Instant.now(), SpannerIOReadTest.PROJECT_ID, Instant.now(), Collections.emptyList(), (Instant) null));
        Assert.assertFalse(metadataTableDao.readStreamPartitionsWithWatermark().isEmpty());
        Assert.assertTrue(metadataTableDao.doHoldLock(create, SpannerIOReadTest.PROJECT_ID));
        Assert.assertFalse(metadataTableDao.releaseStreamPartitionLockForDeletion(create, "0000"));
        Assert.assertFalse(metadataTableDao.readStreamPartitionsWithWatermark().isEmpty());
        Assert.assertTrue(metadataTableDao.doHoldLock(create, SpannerIOReadTest.PROJECT_ID));
        Assert.assertTrue(metadataTableDao.releaseStreamPartitionLockForDeletion(create, SpannerIOReadTest.PROJECT_ID));
        Assert.assertTrue(metadataTableDao.readStreamPartitionsWithWatermark().isEmpty());
        Assert.assertFalse(metadataTableDao.doHoldLock(create, SpannerIOReadTest.PROJECT_ID));
    }

    @Test
    public void testDeleteStreamPartitionRow() throws InvalidProtocolBufferException {
        Range.ByteStringRange create = Range.ByteStringRange.create("A", "B");
        Assert.assertTrue(metadataTableDao.lockAndRecordPartition(new PartitionRecord(create, Instant.now(), SpannerIOReadTest.PROJECT_ID, Instant.now(), Collections.emptyList(), (Instant) null)));
        metadataTableDao.updateWatermark(create, Instant.now(), (ChangeStreamContinuationToken) null);
        Assert.assertFalse(metadataTableDao.readStreamPartitionsWithWatermark().isEmpty());
        Assert.assertTrue(metadataTableDao.doHoldLock(create, SpannerIOReadTest.PROJECT_ID));
        Assert.assertFalse(metadataTableDao.deleteStreamPartitionRow(create));
        Assert.assertFalse(metadataTableDao.readStreamPartitionsWithWatermark().isEmpty());
        Assert.assertTrue(metadataTableDao.doHoldLock(create, SpannerIOReadTest.PROJECT_ID));
        Assert.assertTrue(metadataTableDao.releaseStreamPartitionLockForDeletion(create, SpannerIOReadTest.PROJECT_ID));
        Assert.assertTrue(metadataTableDao.deleteStreamPartitionRow(create));
        Assert.assertTrue(metadataTableDao.readStreamPartitionsWithWatermark().isEmpty());
        Assert.assertFalse(metadataTableDao.doHoldLock(create, SpannerIOReadTest.PROJECT_ID));
    }

    @Test
    public void testLockPartitionRecordsMetadata() {
        Range.ByteStringRange create = Range.ByteStringRange.create("A", "C");
        ChangeStreamContinuationToken create2 = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("B", "C"), "token1");
        PartitionRecord partitionRecord = new PartitionRecord(create, Collections.singletonList(create2), SpannerIOReadTest.PROJECT_ID, Instant.now(), Collections.emptyList(), Instant.now());
        metadataTableDao.lockAndRecordPartition(partitionRecord);
        Assert.assertTrue(metadataTableDao.doHoldLock(create, SpannerIOReadTest.PROJECT_ID));
        Row readRow = dataClient.readRow(metadataTableAdminDao.getTableId(), metadataTableDao.convertPartitionToStreamPartitionRowKey(create));
        Assert.assertTrue(readRow.getCells("should_delete", "latest").isEmpty());
        Assert.assertEquals(partitionRecord.getParentLowWatermark(), MetadataTableEncoder.parseWatermarkFromRow(readRow));
        Assert.assertEquals(create2.toByteString(), ((RowCell) readRow.getCells("initial_continuation_token", Range.ByteStringRange.serializeToByteString(create2.getPartition())).get(0)).getValue());
    }
}
