package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;

import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.transforms.DoFn;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ResumeFromPreviousPipelineActionTest.class */
public class ResumeFromPreviousPipelineActionTest {

    @ClassRule
    public static final BigtableEmulatorRule BIGTABLE_EMULATOR_RULE = BigtableEmulatorRule.create();
    private MetadataTableDao metadataTableDao;

    @Mock
    private ChangeStreamMetrics metrics;

    @Mock
    private DoFn.OutputReceiver<PartitionRecord> receiver;
    private Instant endTime;
    private static BigtableDataClient dataClient;
    private static BigtableTableAdminClient adminClient;
    private ResumeFromPreviousPipelineAction action;

    @Captor
    ArgumentCaptor<PartitionRecord> partitionRecordArgumentCaptor;

    @BeforeClass
    public static void beforeClass() throws IOException {
        adminClient = BigtableTableAdminClient.create(BigtableTableAdminSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort()).setProjectId("fake-project").setInstanceId("fake-instance").build());
        dataClient = BigtableDataClient.create(BigtableDataSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort()).setProjectId("fake-project").setInstanceId("fake-instance").build());
    }

    @Before
    public void setUp() throws Exception {
        MetadataTableAdminDao metadataTableAdminDao = new MetadataTableAdminDao(adminClient, (BigtableInstanceAdminClient) null, UniqueIdGenerator.generateRowKeyPrefix(), "__change_stream_md_table");
        metadataTableAdminDao.createMetadataTable();
        metadataTableAdminDao.cleanUpPrefix();
        this.metadataTableDao = new MetadataTableDao(dataClient, metadataTableAdminDao.getTableId(), metadataTableAdminDao.getChangeStreamNamePrefix());
        this.endTime = Instant.now().plus(Duration.standardSeconds(10L));
        this.action = new ResumeFromPreviousPipelineAction(this.metrics, this.metadataTableDao, this.endTime, new ProcessNewPartitionsAction(this.metrics, this.metadataTableDao, this.endTime));
    }

    @Test
    public void testResetMissingPartitions() throws InvalidProtocolBufferException {
        HashMap hashMap = new HashMap();
        hashMap.put(Range.ByteStringRange.create("A", "B"), Instant.now());
        this.metadataTableDao.writeDetectNewPartitionMissingPartitions(hashMap);
        Assert.assertFalse(this.metadataTableDao.readDetectNewPartitionMissingPartitions().isEmpty());
        this.action.run(this.receiver);
        Assert.assertTrue(this.metadataTableDao.readDetectNewPartitionMissingPartitions().isEmpty());
    }

    @Test
    public void testOutputExistingStreamPartitions() throws InvalidProtocolBufferException {
        Range.ByteStringRange create = Range.ByteStringRange.create("A", "B");
        Instant minus = Instant.now().minus(Duration.standardSeconds(10L));
        PartitionRecord partitionRecord = new PartitionRecord(create, minus, "1", minus, Collections.emptyList(), (Instant) null);
        this.metadataTableDao.lockAndRecordPartition(partitionRecord);
        Range.ByteStringRange create2 = Range.ByteStringRange.create("B", "D");
        PartitionRecord partitionRecord2 = new PartitionRecord(create2, Arrays.asList(ChangeStreamContinuationToken.create(Range.ByteStringRange.create("B", "C"), "tokenBC"), ChangeStreamContinuationToken.create(Range.ByteStringRange.create("C", "D"), "tokenCD")), "2", Instant.now().plus(Duration.standardMinutes(1L)), Collections.emptyList(), (Instant) null);
        this.metadataTableDao.lockAndRecordPartition(partitionRecord2);
        Range.ByteStringRange create3 = Range.ByteStringRange.create("D", "H");
        Instant now = Instant.now();
        PartitionRecord partitionRecord3 = new PartitionRecord(create3, now, "3", now, Collections.emptyList(), (Instant) null);
        this.metadataTableDao.lockAndRecordPartition(partitionRecord3);
        ChangeStreamContinuationToken create4 = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("D", "H"), "tokenDH");
        this.metadataTableDao.updateWatermark(create3, now, create4);
        Range.ByteStringRange create5 = Range.ByteStringRange.create("H", "I");
        ChangeStreamContinuationToken create6 = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("H", "I"), "tokenHI");
        PartitionRecord partitionRecord4 = new PartitionRecord(create5, Collections.singletonList(create6), "4", Instant.now(), Collections.emptyList(), (Instant) null);
        this.metadataTableDao.lockAndRecordPartition(partitionRecord4);
        this.metadataTableDao.releaseStreamPartitionLockForDeletion(create5, partitionRecord4.getUuid());
        this.action.run(this.receiver);
        ((DoFn.OutputReceiver) Mockito.verify(this.receiver, Mockito.times(4))).outputWithTimestamp((PartitionRecord) this.partitionRecordArgumentCaptor.capture(), (Instant) ArgumentMatchers.eq(Instant.EPOCH));
        List allValues = this.partitionRecordArgumentCaptor.getAllValues();
        PartitionRecord partitionRecord5 = (PartitionRecord) allValues.get(0);
        Assert.assertEquals(create, partitionRecord5.getPartition());
        Assert.assertEquals(partitionRecord.getUuid(), partitionRecord5.getUuid());
        Assert.assertNull(partitionRecord5.getChangeStreamContinuationTokens());
        Assert.assertEquals(partitionRecord.getStartTime(), partitionRecord5.getStartTime());
        Assert.assertEquals(this.endTime, partitionRecord5.getEndTime());
        Assert.assertEquals(partitionRecord.getParentLowWatermark(), partitionRecord5.getParentLowWatermark());
        PartitionRecord partitionRecord6 = (PartitionRecord) allValues.get(1);
        Assert.assertEquals(create2, partitionRecord6.getPartition());
        Assert.assertEquals(partitionRecord2.getUuid(), partitionRecord6.getUuid());
        Assert.assertNull(partitionRecord6.getStartTime());
        Assert.assertEquals(partitionRecord2.getChangeStreamContinuationTokens(), partitionRecord6.getChangeStreamContinuationTokens());
        Assert.assertEquals(this.endTime, partitionRecord6.getEndTime());
        Assert.assertEquals(partitionRecord2.getParentLowWatermark(), partitionRecord6.getParentLowWatermark());
        PartitionRecord partitionRecord7 = (PartitionRecord) allValues.get(2);
        Assert.assertEquals(create3, partitionRecord7.getPartition());
        Assert.assertEquals(partitionRecord3.getUuid(), partitionRecord7.getUuid());
        Assert.assertNull(partitionRecord7.getStartTime());
        Assert.assertNotNull(partitionRecord7.getChangeStreamContinuationTokens());
        Assert.assertEquals(1L, partitionRecord7.getChangeStreamContinuationTokens().size());
        Assert.assertEquals(create4, partitionRecord7.getChangeStreamContinuationTokens().get(0));
        Assert.assertEquals(this.endTime, partitionRecord7.getEndTime());
        Assert.assertEquals(partitionRecord3.getParentLowWatermark(), partitionRecord7.getParentLowWatermark());
        PartitionRecord partitionRecord8 = (PartitionRecord) allValues.get(3);
        Assert.assertEquals(create5, partitionRecord8.getPartition());
        Assert.assertNotEquals("4", partitionRecord8.getUuid());
        Assert.assertNull(partitionRecord8.getStartTime());
        Assert.assertNotNull(partitionRecord8.getChangeStreamContinuationTokens());
        Assert.assertEquals(1L, partitionRecord8.getChangeStreamContinuationTokens().size());
        Assert.assertEquals(create6, partitionRecord8.getChangeStreamContinuationTokens().get(0));
        Assert.assertEquals(this.endTime, partitionRecord8.getEndTime());
        Assert.assertEquals(partitionRecord4.getParentLowWatermark(), partitionRecord8.getParentLowWatermark());
    }

    @Test
    public void testOutputNewPartitions() throws InvalidProtocolBufferException {
        Range.ByteStringRange create = Range.ByteStringRange.create("A", "B");
        ChangeStreamContinuationToken create2 = ChangeStreamContinuationToken.create(create, "tokenAB");
        Instant minus = Instant.now().minus(Duration.standardSeconds(10L));
        NewPartition newPartition = new NewPartition(create, Collections.singletonList(create2), minus);
        this.metadataTableDao.writeNewPartition(newPartition);
        this.metadataTableDao.writeNewPartition(new NewPartition(Range.ByteStringRange.create("B", "D"), Collections.singletonList(ChangeStreamContinuationToken.create(Range.ByteStringRange.create("C", "D"), "tokenCD")), Instant.now().plus(Duration.standardSeconds(10L))));
        Range.ByteStringRange create3 = Range.ByteStringRange.create("D", "E");
        ChangeStreamContinuationToken create4 = ChangeStreamContinuationToken.create(create3, "tokenDE");
        Instant plus = Instant.now().plus(Duration.standardSeconds(5L));
        NewPartition newPartition2 = new NewPartition(create3, Collections.singletonList(create4), plus);
        this.metadataTableDao.writeNewPartition(newPartition2);
        this.metadataTableDao.markNewPartitionForDeletion(newPartition2);
        Assert.assertEquals(2L, this.metadataTableDao.readNewPartitions().size());
        this.action.run(this.receiver);
        ((DoFn.OutputReceiver) Mockito.verify(this.receiver, Mockito.times(2))).outputWithTimestamp((PartitionRecord) this.partitionRecordArgumentCaptor.capture(), (Instant) ArgumentMatchers.eq(Instant.EPOCH));
        List allValues = this.partitionRecordArgumentCaptor.getAllValues();
        PartitionRecord partitionRecord = (PartitionRecord) allValues.get(0);
        Assert.assertEquals(create, partitionRecord.getPartition());
        Assert.assertEquals(1L, partitionRecord.getChangeStreamContinuationTokens().size());
        Assert.assertEquals(create2, partitionRecord.getChangeStreamContinuationTokens().get(0));
        Assert.assertNull(partitionRecord.getStartTime());
        Assert.assertEquals(minus, partitionRecord.getParentLowWatermark());
        Assert.assertEquals(this.endTime, partitionRecord.getEndTime());
        Assert.assertEquals(Collections.singletonList(newPartition), partitionRecord.getParentPartitions());
        Assert.assertFalse(partitionRecord.getUuid().isEmpty());
        PartitionRecord partitionRecord2 = (PartitionRecord) allValues.get(1);
        Assert.assertEquals(create3, partitionRecord2.getPartition());
        Assert.assertEquals(1L, partitionRecord2.getChangeStreamContinuationTokens().size());
        Assert.assertEquals(create4, partitionRecord2.getChangeStreamContinuationTokens().get(0));
        Assert.assertNull(partitionRecord2.getStartTime());
        Assert.assertEquals(plus, partitionRecord2.getParentLowWatermark());
        Assert.assertEquals(this.endTime, partitionRecord2.getEndTime());
        Assert.assertEquals(Collections.singletonList(newPartition2), partitionRecord2.getParentPartitions());
        Assert.assertEquals(1L, this.metadataTableDao.readNewPartitions().size());
    }
}
