package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn;

import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
import com.google.protobuf.ByteString;
import java.io.IOException;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.InitialPipelineState;
import org.apache.beam.sdk.transforms.DoFn;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFnTest.class */
public class InitializeDoFnTest {

    @ClassRule
    public static final BigtableEmulatorRule BIGTABLE_EMULATOR_RULE = BigtableEmulatorRule.create();

    @Mock
    private DaoFactory daoFactory;

    @Mock
    private transient MetadataTableAdminDao metadataTableAdminDao;
    private transient MetadataTableDao metadataTableDao;

    @Mock
    private DoFn.OutputReceiver<InitialPipelineState> outputReceiver;
    private final String tableId = "table";
    private static BigtableDataClient dataClient;
    private static BigtableTableAdminClient adminClient;

    @BeforeClass
    public static void beforeClass() throws IOException {
        adminClient = BigtableTableAdminClient.create(BigtableTableAdminSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort()).setProjectId("fake-project").setInstanceId("fake-instance").build());
        dataClient = BigtableDataClient.create(BigtableDataSettings.newBuilderForEmulator(BIGTABLE_EMULATOR_RULE.getPort()).setProjectId("fake-project").setInstanceId("fake-instance").build());
    }

    @Before
    public void setUp() throws IOException {
        this.metadataTableAdminDao = (MetadataTableAdminDao) Mockito.spy(new MetadataTableAdminDao(adminClient, (BigtableInstanceAdminClient) null, "changeStreamName", "table"));
        this.metadataTableAdminDao.createMetadataTable();
        ((MetadataTableAdminDao) Mockito.doReturn(true).when(this.metadataTableAdminDao)).isAppProfileSingleClusterAndTransactional((String) ArgumentMatchers.any());
        Mockito.when(this.daoFactory.getMetadataTableAdminDao()).thenReturn(this.metadataTableAdminDao);
        this.metadataTableDao = new MetadataTableDao(dataClient, "table", this.metadataTableAdminDao.getChangeStreamNamePrefix());
        Mockito.when(this.daoFactory.getMetadataTableDao()).thenReturn(this.metadataTableDao);
        Mockito.when(this.daoFactory.getChangeStreamName()).thenReturn("changeStreamName");
    }

    @Test
    public void testInitializeDefault() throws IOException {
        Instant now = Instant.now();
        new InitializeDoFn(this.daoFactory, "app-profile", now, BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS).processElement(this.outputReceiver);
        ((DoFn.OutputReceiver) Mockito.verify(this.outputReceiver, Mockito.times(1))).output(new InitialPipelineState(now, false));
    }

    @Test
    public void testInitializeStopWithExistingPipeline() throws IOException {
        this.metadataTableDao.updateDetectNewPartitionWatermark(Instant.now());
        new InitializeDoFn(this.daoFactory, "app-profile", Instant.now(), BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS).processElement(this.outputReceiver);
        ((DoFn.OutputReceiver) Mockito.verify(this.outputReceiver, Mockito.never())).output((InitialPipelineState) ArgumentMatchers.any());
    }

    @Test
    public void testInitializeStopWithoutDNP() throws IOException {
        dataClient.mutateRow(RowMutation.create("table", this.metadataTableAdminDao.getChangeStreamNamePrefix().concat(ByteString.copyFromUtf8("existing_row"))).setCell("watermark", "latest", 123L));
        Instant now = Instant.now();
        new InitializeDoFn(this.daoFactory, "app-profile", now, BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS).processElement(this.outputReceiver);
        ((DoFn.OutputReceiver) Mockito.verify(this.outputReceiver, Mockito.times(1))).output(new InitialPipelineState(now, false));
        Assert.assertNull(dataClient.readRow("table", this.metadataTableAdminDao.getChangeStreamNamePrefix()));
    }

    @Test
    public void testInitializeNewWithoutDNP() throws IOException {
        dataClient.mutateRow(RowMutation.create("table", this.metadataTableAdminDao.getChangeStreamNamePrefix().concat(ByteString.copyFromUtf8("existing_row"))).setCell("watermark", "latest", 123L));
        Instant now = Instant.now();
        new InitializeDoFn(this.daoFactory, "app-profile", now, BigtableIO.ExistingPipelineOptions.NEW).processElement(this.outputReceiver);
        ((DoFn.OutputReceiver) Mockito.verify(this.outputReceiver, Mockito.times(1))).output(new InitialPipelineState(now, false));
        Assert.assertNull(dataClient.readRow("table", this.metadataTableAdminDao.getChangeStreamNamePrefix()));
    }

    @Test
    public void testInitializeNewWithDNP() throws IOException {
        this.metadataTableDao.updateDetectNewPartitionWatermark(Instant.now());
        dataClient.mutateRow(RowMutation.create("table", this.metadataTableAdminDao.getChangeStreamNamePrefix().concat(ByteString.copyFromUtf8("existing_row"))).setCell("watermark", "latest", 123L));
        Instant now = Instant.now();
        new InitializeDoFn(this.daoFactory, "app-profile", now, BigtableIO.ExistingPipelineOptions.NEW).processElement(this.outputReceiver);
        ((DoFn.OutputReceiver) Mockito.verify(this.outputReceiver, Mockito.times(1))).output(new InitialPipelineState(now, false));
        Assert.assertNull(dataClient.readRow("table", this.metadataTableAdminDao.getChangeStreamNamePrefix()));
    }

    @Test
    public void testInitializeResumeWithoutDNP() throws IOException {
        dataClient.mutateRow(RowMutation.create("table", this.metadataTableAdminDao.getChangeStreamNamePrefix().concat(ByteString.copyFromUtf8("existing_row"))).setCell("watermark", "latest", 123L));
        Instant now = Instant.now();
        new InitializeDoFn(this.daoFactory, "app-profile", now, BigtableIO.ExistingPipelineOptions.RESUME_OR_NEW).processElement(this.outputReceiver);
        ((DoFn.OutputReceiver) Mockito.verify(this.outputReceiver, Mockito.times(1))).output(new InitialPipelineState(now, false));
    }

    @Test
    public void testInitializeResumeWithDNP() throws IOException {
        Instant minus = Instant.now().minus(Duration.standardSeconds(10000L));
        this.metadataTableDao.updateDetectNewPartitionWatermark(minus);
        dataClient.mutateRow(RowMutation.create("table", this.metadataTableAdminDao.getChangeStreamNamePrefix().concat(ByteString.copyFromUtf8("existing_row"))).setCell("watermark", "latest", 123L));
        new InitializeDoFn(this.daoFactory, "app-profile", Instant.now(), BigtableIO.ExistingPipelineOptions.RESUME_OR_NEW).processElement(this.outputReceiver);
        ((DoFn.OutputReceiver) Mockito.verify(this.outputReceiver, Mockito.times(1))).output(new InitialPipelineState(minus, true));
        Assert.assertNull(dataClient.readRow("table", this.metadataTableAdminDao.getChangeStreamNamePrefix()));
    }

    @Test
    public void testInitializeSkipCleanupWithoutDNP() throws IOException {
        ByteString concat = this.metadataTableAdminDao.getChangeStreamNamePrefix().concat(ByteString.copyFromUtf8("existing_row"));
        dataClient.mutateRow(RowMutation.create("table", concat).setCell("watermark", "latest", 123L));
        Instant now = Instant.now();
        new InitializeDoFn(this.daoFactory, "app-profile", now, BigtableIO.ExistingPipelineOptions.SKIP_CLEANUP).processElement(this.outputReceiver);
        ((DoFn.OutputReceiver) Mockito.verify(this.outputReceiver, Mockito.times(1))).output(new InitialPipelineState(now, false));
        Assert.assertNotNull(dataClient.readRow("table", concat));
    }

    @Test
    public void testInitializeSkipCleanupWithDNP() throws IOException {
        this.metadataTableDao.updateDetectNewPartitionWatermark(Instant.now().minus(Duration.standardSeconds(10000L)));
        ByteString concat = this.metadataTableAdminDao.getChangeStreamNamePrefix().concat(ByteString.copyFromUtf8("existing_row"));
        dataClient.mutateRow(RowMutation.create("table", concat).setCell("watermark", "latest", 123L));
        new InitializeDoFn(this.daoFactory, "app-profile", Instant.now(), BigtableIO.ExistingPipelineOptions.SKIP_CLEANUP).processElement(this.outputReceiver);
        ((DoFn.OutputReceiver) Mockito.verify(this.outputReceiver, Mockito.never())).output((InitialPipelineState) ArgumentMatchers.any());
        Assert.assertNotNull(dataClient.readRow("table", concat));
    }
}
