package org.apache.beam.sdk.io.gcp.bigtable.changestreams.reconciler;

import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/reconciler/PartitionReconcilerTest.class */
public class PartitionReconcilerTest {

    @ClassRule
    public static final BigtableEmulatorRule BIGTABLE_EMULATOR_RULE = BigtableEmulatorRule.create();
    private static final Duration MISSING_SHORT_PERIOD = Duration.standardSeconds(121);
    private static final Duration MISSING_LONG_PERIOD = Duration.standardSeconds(1201);
    private MetadataTableDao metadataTableDao;
    private static BigtableDataClient dataClient;
    private static BigtableTableAdminClient adminClient;
    private Instant lowWatermark;
    private Instant startTime;

    @Mock
    private ChangeStreamMetrics metrics;

    @BeforeClass
    public static void beforeClass() throws IOException {
        adminClient = BigtableTableAdminClient.create(BigtableTableAdminSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort()).setProjectId("fake-project").setInstanceId("fake-instance").build());
        dataClient = BigtableDataClient.create(BigtableDataSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort()).setProjectId("fake-project").setInstanceId("fake-instance").build());
    }

    @Before
    public void setUp() throws Exception {
        MetadataTableAdminDao metadataTableAdminDao = new MetadataTableAdminDao(adminClient, (BigtableInstanceAdminClient) null, UniqueIdGenerator.generateRowKeyPrefix(), "__change_stream_md_table");
        metadataTableAdminDao.createMetadataTable();
        metadataTableAdminDao.cleanUpPrefix();
        this.metadataTableDao = new MetadataTableDao(dataClient, metadataTableAdminDao.getTableId(), metadataTableAdminDao.getChangeStreamNamePrefix());
        this.lowWatermark = Instant.now();
        this.startTime = this.lowWatermark.minus(Duration.standardMinutes(30L));
    }

    @Test
    public void testNewMissingMergePartitionIsNotReconciled() {
        Range.ByteStringRange create = Range.ByteStringRange.create("A", "B");
        Range.ByteStringRange create2 = Range.ByteStringRange.create("A", "C");
        ChangeStreamContinuationToken create3 = ChangeStreamContinuationToken.create(create, "AB");
        PartitionReconciler partitionReconciler = new PartitionReconciler(this.metadataTableDao, this.metrics);
        partitionReconciler.addMissingPartitions(Collections.singletonList(create));
        partitionReconciler.addIncompleteNewPartitions(new NewPartition(create2, Collections.singletonList(create3), Instant.now()));
        Assert.assertEquals(0L, partitionReconciler.getPartitionsToReconcile(this.lowWatermark, this.startTime).size());
    }

    @Test
    public void testLongMissingMergePartitionIsReconciled() {
        Range.ByteStringRange create = Range.ByteStringRange.create("A", "B");
        Range.ByteStringRange create2 = Range.ByteStringRange.create("A", "C");
        ChangeStreamContinuationToken create3 = ChangeStreamContinuationToken.create(create, "AB");
        HashMap hashMap = new HashMap();
        hashMap.put(create, Instant.now().minus(MISSING_SHORT_PERIOD));
        this.metadataTableDao.writeDetectNewPartitionMissingPartitions(hashMap);
        PartitionReconciler partitionReconciler = new PartitionReconciler(this.metadataTableDao, this.metrics);
        partitionReconciler.addMissingPartitions(Collections.singletonList(create));
        NewPartition newPartition = new NewPartition(create2, Collections.singletonList(create3), Instant.now());
        partitionReconciler.addIncompleteNewPartitions(newPartition);
        List partitionsToReconcile = partitionReconciler.getPartitionsToReconcile(this.lowWatermark, this.startTime);
        PartitionRecord partitionRecord = new PartitionRecord(create, Collections.singletonList(create3), this.lowWatermark, Collections.singletonList(newPartition));
        Assert.assertEquals(1L, partitionsToReconcile.size());
        Assert.assertEquals(partitionRecord, partitionsToReconcile.get(0));
    }

    @Test
    public void testMismatchedMergePartitionIsReconciled() {
        Range.ByteStringRange create = Range.ByteStringRange.create("A", "C");
        Range.ByteStringRange create2 = Range.ByteStringRange.create("A", "D");
        ChangeStreamContinuationToken create3 = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("A", "B"), "AB");
        ChangeStreamContinuationToken create4 = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("B", "C"), "BC");
        ChangeStreamContinuationToken create5 = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("C", "D"), "CD");
        HashMap hashMap = new HashMap();
        hashMap.put(create2, Instant.now().minus(MISSING_LONG_PERIOD));
        this.metadataTableDao.writeDetectNewPartitionMissingPartitions(hashMap);
        PartitionReconciler partitionReconciler = new PartitionReconciler(this.metadataTableDao, this.metrics);
        partitionReconciler.addMissingPartitions(Collections.singletonList(create2));
        NewPartition newPartition = new NewPartition(create, Collections.singletonList(create3), Instant.now());
        partitionReconciler.addIncompleteNewPartitions(newPartition);
        NewPartition newPartition2 = new NewPartition(create2, Arrays.asList(create4, create5), Instant.now());
        partitionReconciler.addIncompleteNewPartitions(newPartition2);
        List partitionsToReconcile = partitionReconciler.getPartitionsToReconcile(this.lowWatermark, this.startTime);
        PartitionRecord partitionRecord = new PartitionRecord(create2, Arrays.asList(create3, create4, create5), this.lowWatermark, Arrays.asList(newPartition, new NewPartition(create2, Collections.singletonList(create4), newPartition2.getLowWatermark()), new NewPartition(create2, Collections.singletonList(create5), newPartition2.getLowWatermark())));
        Assert.assertEquals(1L, partitionsToReconcile.size());
        Assert.assertEquals(partitionRecord, partitionsToReconcile.get(0));
    }

    @Test
    public void testMismatchedMergeSplitPartitionIsReconciled() {
        Range.ByteStringRange create = Range.ByteStringRange.create("A", "C");
        Range.ByteStringRange create2 = Range.ByteStringRange.create("A", "D");
        ChangeStreamContinuationToken create3 = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("A", "B"), "AB");
        ChangeStreamContinuationToken create4 = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("B", "C"), "BC");
        HashMap hashMap = new HashMap();
        hashMap.put(create, Instant.now().minus(MISSING_LONG_PERIOD));
        this.metadataTableDao.writeDetectNewPartitionMissingPartitions(hashMap);
        PartitionReconciler partitionReconciler = new PartitionReconciler(this.metadataTableDao, this.metrics);
        partitionReconciler.addMissingPartitions(Collections.singletonList(create));
        NewPartition newPartition = new NewPartition(create2, Collections.singletonList(create3), Instant.now());
        partitionReconciler.addIncompleteNewPartitions(newPartition);
        NewPartition newPartition2 = new NewPartition(create, Collections.singletonList(create4), Instant.now());
        partitionReconciler.addIncompleteNewPartitions(newPartition2);
        List partitionsToReconcile = partitionReconciler.getPartitionsToReconcile(this.lowWatermark, this.startTime);
        PartitionRecord partitionRecord = new PartitionRecord(create, Arrays.asList(create3, create4), this.lowWatermark, Arrays.asList(newPartition, newPartition2));
        Assert.assertEquals(1L, partitionsToReconcile.size());
        Assert.assertEquals(partitionRecord, partitionsToReconcile.get(0));
    }

    @Test
    public void testMissingPartitionWithoutToken() {
        Range.ByteStringRange create = Range.ByteStringRange.create("A", "B");
        HashMap hashMap = new HashMap();
        hashMap.put(create, Instant.now().minus(MISSING_LONG_PERIOD));
        this.metadataTableDao.writeDetectNewPartitionMissingPartitions(hashMap);
        PartitionReconciler partitionReconciler = new PartitionReconciler(this.metadataTableDao, this.metrics);
        partitionReconciler.addMissingPartitions(Collections.singletonList(create));
        List partitionsToReconcile = partitionReconciler.getPartitionsToReconcile(this.lowWatermark, this.startTime);
        PartitionRecord partitionRecord = new PartitionRecord(create, this.startTime, this.lowWatermark, Collections.emptyList());
        Assert.assertEquals(1L, partitionsToReconcile.size());
        Assert.assertEquals(partitionRecord, partitionsToReconcile.get(0));
        Assert.assertTrue(this.metadataTableDao.readDetectNewPartitionMissingPartitions().isEmpty());
    }

    @Test
    public void testMissingPartitionWithSomeToken() {
        Range.ByteStringRange create = Range.ByteStringRange.create("A", "D");
        Range.ByteStringRange create2 = Range.ByteStringRange.create("B", "C");
        Range.ByteStringRange create3 = Range.ByteStringRange.create("A", "B");
        ChangeStreamContinuationToken create4 = ChangeStreamContinuationToken.create(create3, "AB");
        Range.ByteStringRange create5 = Range.ByteStringRange.create("C", "D");
        ChangeStreamContinuationToken create6 = ChangeStreamContinuationToken.create(create5, "CD");
        HashMap hashMap = new HashMap();
        hashMap.put(create, Instant.now().minus(MISSING_LONG_PERIOD));
        this.metadataTableDao.writeDetectNewPartitionMissingPartitions(hashMap);
        PartitionReconciler partitionReconciler = new PartitionReconciler(this.metadataTableDao, this.metrics);
        partitionReconciler.addMissingPartitions(Collections.singletonList(create));
        NewPartition newPartition = new NewPartition(create, Arrays.asList(create4, create6), Instant.now());
        partitionReconciler.addIncompleteNewPartitions(newPartition);
        List partitionsToReconcile = partitionReconciler.getPartitionsToReconcile(this.lowWatermark, this.startTime);
        NewPartition newPartition2 = new NewPartition(create, Collections.singletonList(create4), newPartition.getLowWatermark());
        NewPartition newPartition3 = new NewPartition(create, Collections.singletonList(create6), newPartition.getLowWatermark());
        PartitionRecord partitionRecord = new PartitionRecord(create3, Collections.singletonList(create4), this.lowWatermark, Collections.singletonList(newPartition2));
        PartitionRecord partitionRecord2 = new PartitionRecord(create5, Collections.singletonList(create6), this.lowWatermark, Collections.singletonList(newPartition3));
        PartitionRecord partitionRecord3 = new PartitionRecord(create2, this.startTime, this.lowWatermark, Collections.emptyList());
        Assert.assertEquals(3L, partitionsToReconcile.size());
        MatcherAssert.assertThat(partitionsToReconcile, Matchers.containsInAnyOrder(Arrays.asList(partitionRecord, partitionRecord3, partitionRecord2).toArray()));
    }

    @Test
    public void testMissingPartitionWithTokenMoreThan10Minutes() {
        Range.ByteStringRange create = Range.ByteStringRange.create("A", "D");
        Range.ByteStringRange create2 = Range.ByteStringRange.create("A", "B");
        ChangeStreamContinuationToken create3 = ChangeStreamContinuationToken.create(create2, "AB");
        HashMap hashMap = new HashMap();
        hashMap.put(create2, Instant.now().minus(MISSING_LONG_PERIOD));
        this.metadataTableDao.writeDetectNewPartitionMissingPartitions(hashMap);
        PartitionReconciler partitionReconciler = new PartitionReconciler(this.metadataTableDao, this.metrics);
        partitionReconciler.addMissingPartitions(Collections.singletonList(create2));
        NewPartition newPartition = new NewPartition(create, Collections.singletonList(create3), Instant.now());
        partitionReconciler.addIncompleteNewPartitions(newPartition);
        List partitionsToReconcile = partitionReconciler.getPartitionsToReconcile(this.lowWatermark, this.startTime);
        Assert.assertTrue(this.metadataTableDao.readDetectNewPartitionMissingPartitions().isEmpty());
        Assert.assertEquals(1L, partitionsToReconcile.size());
        Assert.assertEquals(partitionsToReconcile, Collections.singletonList(new PartitionRecord(create2, Collections.singletonList(create3), this.lowWatermark, Collections.singletonList(new NewPartition(create, Collections.singletonList(create3), newPartition.getLowWatermark())))));
    }
}
