package org.apache.beam.sdk.io.gcp.bigtable.changestreams.it;

import com.google.api.gax.batching.Batcher;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
import com.google.cloud.bigtable.admin.v2.models.UpdateTableRequest;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Arrays;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.BigtableChangeStreamTestOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Instant;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/it/BigtableChangeStreamIT.class */
public class BigtableChangeStreamIT {
    private static final Logger LOG = LoggerFactory.getLogger(BigtableChangeStreamIT.class);
    private static final String COLUMN_FAMILY1 = "CF";
    private static final String COLUMN_FAMILY2 = "CF2";
    private static final String COLUMN_QUALIFIER = "CQ";
    private static String projectId;
    private static String instanceId;
    private static String tableId;
    private static String appProfileId;
    private static String metadataTableId;
    private static BigtableTableAdminClient adminClient;
    private static BigtableDataClient dataClient;
    private static BigtableClientIntegrationTestOverride bigtableClientOverride;
    private static Batcher<RowMutationEntry, Void> mutationBatcher;
    private static BigtableChangeStreamTestOptions options;
    private transient TestPipeline pipeline;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/it/BigtableChangeStreamIT$ConvertToEntry.class */
    public static class ConvertToEntry extends DoFn<KV<ByteString, ChangeStreamMutation>, MutateRowsRequest.Entry> {
        private ConvertToEntry() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<ByteString, ChangeStreamMutation> kv, DoFn.OutputReceiver<MutateRowsRequest.Entry> outputReceiver) {
            outputReceiver.output(((ChangeStreamMutation) kv.getValue()).toRowMutationEntry().toProto());
        }
    }

    @BeforeClass
    public static void beforeClass() throws IOException {
        options = IOITHelper.readIOTestPipelineOptions(BigtableChangeStreamTestOptions.class);
        LOG.info("Pipeline options: {}", options);
        projectId = options.as(GcpOptions.class).getProject();
        instanceId = options.getBigtableChangeStreamInstanceId();
        long millis = Instant.now().getMillis();
        tableId = "beam-change-stream-test-" + millis;
        metadataTableId = "beam-change-stream-test-md-" + millis;
        appProfileId = "default";
        bigtableClientOverride = new BigtableClientIntegrationTestOverride();
        LOG.info(bigtableClientOverride.toString());
        BigtableDataSettings.Builder newBuilder = BigtableDataSettings.newBuilder();
        BigtableTableAdminSettings.Builder newBuilder2 = BigtableTableAdminSettings.newBuilder();
        newBuilder.setProjectId(projectId);
        newBuilder2.setProjectId(projectId);
        newBuilder.setInstanceId(instanceId);
        newBuilder2.setInstanceId(instanceId);
        newBuilder.setAppProfileId(appProfileId);
        newBuilder.stubSettings().setTransportChannelProvider(EnhancedBigtableStubSettings.defaultGrpcTransportProviderBuilder().setAttemptDirectPath(false).build());
        bigtableClientOverride.updateDataClientSettings(newBuilder);
        bigtableClientOverride.updateTableAdminClientSettings(newBuilder2);
        dataClient = BigtableDataClient.create(newBuilder.build());
        adminClient = BigtableTableAdminClient.create(newBuilder2.build());
        adminClient.createTable(CreateTableRequest.of(tableId).addChangeStreamRetention(Duration.ofDays(1L)).addFamily(COLUMN_FAMILY1).addFamily(COLUMN_FAMILY2));
        mutationBatcher = dataClient.newBulkMutationBatcher(tableId);
    }

    @Before
    public void before() {
        this.pipeline = TestPipeline.fromOptions(options).enableAbandonedNodeEnforcement(false);
    }

    @AfterClass
    public static void afterClass() {
        if (adminClient != null) {
            if (adminClient.exists(tableId)) {
                adminClient.updateTable(UpdateTableRequest.of(tableId).disableChangeStreamRetention());
                adminClient.deleteTable(tableId);
                adminClient.deleteTable(metadataTableId);
            }
            adminClient.close();
        }
        if (dataClient != null) {
            dataClient.close();
        }
    }

    @Test
    public void testReadBigtableChangeStream() throws InterruptedException {
        Instant now = Instant.now();
        RowMutationEntry cell = RowMutationEntry.create("rowKeySetCell").setCell(COLUMN_FAMILY1, COLUMN_QUALIFIER, "cell value 1");
        mutationBatcher.add(cell);
        mutationBatcher.flush();
        PAssert.that(buildPipeline(now, Instant.now().plus(org.joda.time.Duration.standardSeconds(1L)))).containsInAnyOrder(new MutateRowsRequest.Entry[]{cell.toProto()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testDeleteRow() throws InterruptedException {
        Instant now = Instant.now();
        RowMutationEntry cell = RowMutationEntry.create("rowKeyToDelete").setCell(COLUMN_FAMILY1, COLUMN_QUALIFIER, "cell value 1");
        RowMutationEntry deleteRow = RowMutationEntry.create("rowKeyToDelete").deleteRow();
        mutationBatcher.add(cell);
        mutationBatcher.flush();
        mutationBatcher.add(deleteRow);
        mutationBatcher.flush();
        PAssert.that(buildPipeline(now, Instant.now().plus(org.joda.time.Duration.standardSeconds(1L)))).containsInAnyOrder(new MutateRowsRequest.Entry[]{cell.toProto(), RowMutationEntry.create("rowKeyToDelete").deleteFamily(COLUMN_FAMILY1).deleteFamily(COLUMN_FAMILY2).toProto()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testDeleteColumnFamily() throws InterruptedException {
        Instant now = Instant.now();
        RowMutationEntry cell = RowMutationEntry.create("rowKeyMultiFamily").setCell(COLUMN_FAMILY1, COLUMN_QUALIFIER, "cell value 1").setCell(COLUMN_FAMILY2, COLUMN_QUALIFIER, "cell value 1");
        mutationBatcher.add(cell);
        mutationBatcher.flush();
        RowMutationEntry deleteFamily = RowMutationEntry.create("rowKeyMultiFamily").deleteFamily(COLUMN_FAMILY2);
        mutationBatcher.add(deleteFamily);
        mutationBatcher.flush();
        PAssert.that(buildPipeline(now, Instant.now().plus(org.joda.time.Duration.standardSeconds(1L)))).containsInAnyOrder(new MutateRowsRequest.Entry[]{cell.toProto(), deleteFamily.toProto()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testDeleteCell() throws InterruptedException {
        Instant now = Instant.now();
        RowMutationEntry cell = RowMutationEntry.create("rowKeyMultiCell").setCell(COLUMN_FAMILY1, COLUMN_QUALIFIER, "cell value 1").setCell(COLUMN_FAMILY1, "CQ2", "cell value 1");
        mutationBatcher.add(cell);
        mutationBatcher.flush();
        RowMutationEntry deleteCells = RowMutationEntry.create("rowKeyMultiCell").deleteCells(COLUMN_FAMILY1, ByteString.copyFromUtf8("CQ2"), Range.TimestampRange.create(now.getMillis() * 1000, now.plus(org.joda.time.Duration.standardMinutes(2L)).getMillis() * 1000));
        mutationBatcher.add(deleteCells);
        mutationBatcher.flush();
        PAssert.that(buildPipeline(now, Instant.now().plus(org.joda.time.Duration.standardSeconds(1L)))).containsInAnyOrder(new MutateRowsRequest.Entry[]{cell.toProto(), deleteCells.toProto()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testComplexMutation() throws InterruptedException {
        Instant now = Instant.now();
        RowMutationEntry cell = RowMutationEntry.create("rowKeyComplex").setCell(COLUMN_FAMILY1, COLUMN_QUALIFIER, "cell value 1");
        mutationBatcher.add(cell);
        mutationBatcher.flush();
        RowMutationEntry deleteCells = RowMutationEntry.create("rowKeyComplex").setCell(COLUMN_FAMILY1, "CQ2", "cell value 2").setCell(COLUMN_FAMILY1, "CQ3", "cell value 3").deleteCells(COLUMN_FAMILY1, ByteString.copyFromUtf8(COLUMN_QUALIFIER), Range.TimestampRange.create(now.getMillis() * 1000, now.plus(org.joda.time.Duration.standardMinutes(2L)).getMillis() * 1000));
        mutationBatcher.add(deleteCells);
        mutationBatcher.flush();
        PAssert.that(buildPipeline(now, Instant.now().plus(org.joda.time.Duration.standardSeconds(1L)))).containsInAnyOrder(new MutateRowsRequest.Entry[]{cell.toProto(), deleteCells.toProto()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testLargeMutation() throws InterruptedException {
        Instant now = Instant.now();
        char[] cArr = new char[1536000];
        Arrays.fill(cArr, (char) 8203);
        RowMutationEntry cell = RowMutationEntry.create("rowKeyLargeCell").setCell(COLUMN_FAMILY1, COLUMN_QUALIFIER, String.valueOf(cArr));
        mutationBatcher.add(cell);
        mutationBatcher.flush();
        PAssert.that(buildPipeline(now, Instant.now().plus(org.joda.time.Duration.standardSeconds(1L)))).containsInAnyOrder(new MutateRowsRequest.Entry[]{cell.toProto()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testManyMutations() throws InterruptedException {
        Instant now = Instant.now();
        char[] cArr = new char[3072];
        Arrays.fill(cArr, (char) 8203);
        String valueOf = String.valueOf(cArr);
        ImmutableList.Builder builder = ImmutableList.builder();
        for (int i = 0; i < 100; i++) {
            String str = "rowKey" + i;
            RowMutationEntry cell = RowMutationEntry.create(str).setCell(COLUMN_FAMILY1, COLUMN_QUALIFIER, valueOf);
            RowMutationEntry deleteFamily = RowMutationEntry.create(str).deleteFamily(COLUMN_FAMILY1);
            RowMutationEntry deleteCells = RowMutationEntry.create(str).deleteCells(COLUMN_FAMILY1, ByteString.copyFromUtf8(COLUMN_QUALIFIER), Range.TimestampRange.create(now.getMillis() * 1000, now.plus(org.joda.time.Duration.standardMinutes(2L)).getMillis() * 1000));
            builder.add(cell);
            mutationBatcher.add(cell);
            mutationBatcher.flush();
            builder.add(deleteFamily);
            mutationBatcher.add(deleteFamily);
            mutationBatcher.flush();
            builder.add(deleteCells);
            mutationBatcher.add(deleteCells);
            mutationBatcher.flush();
        }
        PAssert.that(buildPipeline(now, Instant.now().plus(org.joda.time.Duration.standardSeconds(1L)))).containsInAnyOrder((Iterable) builder.build().stream().map((v0) -> {
            return v0.toProto();
        }).collect(Collectors.toList()));
        this.pipeline.run().waitUntilFinish();
    }

    private PCollection<MutateRowsRequest.Entry> buildPipeline(Instant instant, Instant instant2) {
        return this.pipeline.apply(BigtableTestUtils.buildTestPipelineInput(projectId, instanceId, tableId, appProfileId, metadataTableId, instant, instant2, bigtableClientOverride)).apply(ParDo.of(new ConvertToEntry()));
    }
}
