package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowCell;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.MetadataTableEncoder;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIOReadTest;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.class */
public class MetadataTableDaoTest {
    private static final Logger LOG = LoggerFactory.getLogger(MetadataTableDaoTest.class);

    @ClassRule
    public static final BigtableEmulatorRule BIGTABLE_EMULATOR_RULE = BigtableEmulatorRule.create();
    private static MetadataTableDao metadataTableDao;
    private static MetadataTableAdminDao metadataTableAdminDao;
    private static BigtableDataClient dataClient;
    private static BigtableTableAdminClient adminClient;

    /* renamed from: org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDaoTest$1LockPartition, reason: invalid class name */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest$1LockPartition.class */
    class C1LockPartition implements Runnable {
        final String id;
        boolean locked = false;
        final /* synthetic */ Range.ByteStringRange val$partition;

        C1LockPartition(String str, Range.ByteStringRange byteStringRange) {
            this.val$partition = byteStringRange;
            this.id = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Thread.sleep((int) (Math.random() * 1000.0d));
                if (MetadataTableDaoTest.metadataTableDao.lockPartition(this.val$partition, this.id)) {
                    this.locked = true;
                }
            } catch (InterruptedException e) {
                MetadataTableDaoTest.LOG.error(e.toString());
            }
        }
    }

    @BeforeClass
    public static void beforeClass() throws IOException {
        adminClient = BigtableTableAdminClient.create(BigtableTableAdminSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort()).setProjectId("fake-project").setInstanceId("fake-instance").build());
        dataClient = BigtableDataClient.create(BigtableDataSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort()).setProjectId("fake-project").setInstanceId("fake-instance").build());
    }

    @Before
    public void before() {
        metadataTableAdminDao = new MetadataTableAdminDao(adminClient, (BigtableInstanceAdminClient) null, UniqueIdGenerator.generateRowKeyPrefix(), "__change_stream_md_table");
        metadataTableAdminDao.createMetadataTable();
        metadataTableDao = new MetadataTableDao(dataClient, metadataTableAdminDao.getTableId(), metadataTableAdminDao.getChangeStreamNamePrefix());
    }

    @Test
    public void testStreamPartitionRowKeyConversion() throws InvalidProtocolBufferException {
        Range.ByteStringRange create = Range.ByteStringRange.create("a", "b");
        Assert.assertEquals(create, metadataTableDao.convertStreamPartitionRowKeyToPartition(metadataTableDao.convertPartitionToStreamPartitionRowKey(create)));
    }

    @Test
    public void testStreamPartitionRowKeyConversionWithIllegalUtf8() throws InvalidProtocolBufferException {
        ByteString copyFrom = ByteString.copyFrom(new byte[]{-116});
        Range.ByteStringRange create = Range.ByteStringRange.create(copyFrom, copyFrom);
        Assert.assertEquals(create, metadataTableDao.convertStreamPartitionRowKeyToPartition(metadataTableDao.convertPartitionToStreamPartitionRowKey(create)));
    }

    @Test
    public void testLockPartitionRace() throws InterruptedException {
        Range.ByteStringRange create = Range.ByteStringRange.create("", "");
        ByteString convertPartitionToStreamPartitionRowKey = metadataTableDao.convertPartitionToStreamPartitionRowKey(create);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < 1000; i++) {
            arrayList.add(new C1LockPartition(Integer.toString(i), create));
            arrayList2.add(new Thread((Runnable) arrayList.get(i)));
        }
        for (int i2 = 0; i2 < 1000; i2++) {
            ((Thread) arrayList2.get(i2)).start();
        }
        for (int i3 = 0; i3 < 1000; i3++) {
            ((Thread) arrayList2.get(i3)).join();
        }
        int i4 = -1;
        for (int i5 = 0; i5 < 1000; i5++) {
            if (((C1LockPartition) arrayList.get(i5)).locked) {
                if (i4 == -1) {
                    i4 = i5;
                } else {
                    Assert.fail("Multiple owner on the lock. Both " + i4 + " and " + i5 + " (and possibly more) think they hold the lock.");
                }
            }
        }
        Row readRow = dataClient.readRow(metadataTableAdminDao.getTableId(), convertPartitionToStreamPartitionRowKey, Filters.FILTERS.chain().filter(Filters.FILTERS.family().exactMatch("lock")).filter(Filters.FILTERS.qualifier().exactMatch("latest")));
        Assert.assertEquals(1L, readRow.getCells().size());
        Assert.assertEquals(Integer.toString(i4), ((RowCell) readRow.getCells().get(0)).getValue().toStringUtf8());
        dataClient.mutateRow(RowMutation.create(metadataTableAdminDao.getTableId(), convertPartitionToStreamPartitionRowKey).deleteRow());
    }

    @Test
    public void testReadStreamPartitionsWithWatermark() throws InvalidProtocolBufferException {
        Range.ByteStringRange create = Range.ByteStringRange.create("a", "");
        Instant now = Instant.now();
        metadataTableDao.updateWatermark(create, now, (ChangeStreamContinuationToken) null);
        ServerStream readFromMdTableStreamPartitionsWithWatermark = metadataTableDao.readFromMdTableStreamPartitionsWithWatermark();
        ArrayList arrayList = new ArrayList();
        Iterator it = readFromMdTableStreamPartitionsWithWatermark.iterator();
        while (it.hasNext()) {
            arrayList.add((Row) it.next());
        }
        Assert.assertEquals(1L, arrayList.size());
        Assert.assertEquals(now, MetadataTableEncoder.parseWatermarkFromRow((Row) arrayList.get(0)));
        Assert.assertEquals(create, metadataTableDao.convertStreamPartitionRowKeyToPartition(((Row) arrayList.get(0)).getKey()));
    }

    @Test
    public void testNewPartitionsWriteRead() throws InvalidProtocolBufferException {
        Range.ByteStringRange create = Range.ByteStringRange.create("", "");
        Range.ByteStringRange create2 = Range.ByteStringRange.create("", "a");
        ChangeStreamContinuationToken create3 = ChangeStreamContinuationToken.create(create2, "tk1");
        Range.ByteStringRange create4 = Range.ByteStringRange.create("a", "");
        ChangeStreamContinuationToken create5 = ChangeStreamContinuationToken.create(create4, "tk2");
        Instant now = Instant.now();
        metadataTableDao.writeNewPartition(create2, create3, create, now);
        metadataTableDao.writeNewPartition(create4, create5, create, now);
        int i = 0;
        boolean z = false;
        boolean z2 = false;
        Iterator it = metadataTableDao.readNewPartitions().iterator();
        while (it.hasNext()) {
            i++;
            Range.ByteStringRange byteStringRange = Range.ByteStringRange.toByteStringRange(((Row) it.next()).getKey().substring(metadataTableDao.getChangeStreamNamePrefix().concat(MetadataTableAdminDao.NEW_PARTITION_PREFIX).size()));
            if (byteStringRange.equals(create2)) {
                z = true;
            } else if (byteStringRange.equals(create4)) {
                z2 = true;
            }
        }
        Assert.assertTrue(z);
        Assert.assertTrue(z2);
        Assert.assertEquals(2L, i);
    }

    @Test
    public void testUpdateAndReadWatermark() throws InvalidProtocolBufferException {
        Range.ByteStringRange create = Range.ByteStringRange.create("a", "b");
        Instant now = Instant.now();
        metadataTableDao.updateWatermark(create, now, (ChangeStreamContinuationToken) null);
        Range.ByteStringRange create2 = Range.ByteStringRange.create("b", "c");
        Instant now2 = Instant.now();
        metadataTableDao.updateWatermark(create2, now2, (ChangeStreamContinuationToken) null);
        int i = 0;
        boolean z = false;
        boolean z2 = false;
        Iterator it = metadataTableDao.readFromMdTableStreamPartitionsWithWatermark().iterator();
        while (it.hasNext()) {
            Row row = (Row) it.next();
            i++;
            Range.ByteStringRange convertStreamPartitionRowKeyToPartition = metadataTableDao.convertStreamPartitionRowKeyToPartition(row.getKey());
            if (convertStreamPartitionRowKeyToPartition.equals(create)) {
                Assert.assertEquals(now, MetadataTableEncoder.parseWatermarkFromRow(row));
                z = true;
            } else if (convertStreamPartitionRowKeyToPartition.equals(create2)) {
                Assert.assertEquals(now2, MetadataTableEncoder.parseWatermarkFromRow(row));
                z2 = true;
            }
        }
        Assert.assertEquals(2L, i);
        Assert.assertTrue(z);
        Assert.assertTrue(z2);
    }

    @Test
    public void testUpdateWatermark() {
        Range.ByteStringRange create = Range.ByteStringRange.create("a", "b");
        Instant now = Instant.now();
        ChangeStreamContinuationToken create2 = ChangeStreamContinuationToken.create(create, SpannerIOReadTest.PROJECT_ID);
        metadataTableDao.updateWatermark(create, now, create2);
        Row readRow = dataClient.readRow(metadataTableAdminDao.getTableId(), metadataTableDao.convertPartitionToStreamPartitionRowKey(create));
        Assert.assertEquals(create2.getToken(), MetadataTableEncoder.getTokenFromRow(readRow));
        Assert.assertEquals(now, MetadataTableEncoder.parseWatermarkFromRow(readRow));
    }

    @Test
    public void readAndWriteValidMissingPartitionsDuration() {
        HashMap hashMap = new HashMap();
        hashMap.put(Range.ByteStringRange.create("A", "B"), 100L);
        metadataTableDao.writeDetectNewPartitionMissingPartitions(hashMap);
        Assert.assertEquals(hashMap, metadataTableDao.readDetectNewPartitionMissingPartitions());
    }

    @Test
    public void readAndWriteInvalidMissingPartitionsDuration() {
        HashMap hashMap = new HashMap();
        dataClient.mutateRow(RowMutation.create(metadataTableAdminDao.getTableId(), metadataTableAdminDao.getChangeStreamNamePrefix().concat(MetadataTableAdminDao.DETECT_NEW_PARTITION_SUFFIX)).setCell("missing_partitions", ByteString.copyFromUtf8("latest"), ByteString.copyFromUtf8("Invalid serialization")));
        Assert.assertEquals(hashMap, metadataTableDao.readDetectNewPartitionMissingPartitions());
    }

    @Test
    public void readMissingPartitionsWithoutDNPRow() {
        Assert.assertEquals(new HashMap(), metadataTableDao.readDetectNewPartitionMissingPartitions());
    }
}
