package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import java.util.Collections;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.SpannerChangestreamsReadSchemaTransformProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamsSchemaTransformIT.class */
public class SpannerChangeStreamsSchemaTransformIT {

    @ClassRule
    public static final IntegrationTestEnv ENV = new IntegrationTestEnv();

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    private static String instanceId;
    private static String projectId;
    private static String databaseId;
    private static String metadataTableName;
    private static String changeStreamTableName;
    private static String changeStreamName;
    private static DatabaseClient databaseClient;

    @BeforeClass
    public static void beforeClass() throws Exception {
        projectId = ENV.getProjectId();
        instanceId = ENV.getInstanceId();
        databaseId = ENV.getDatabaseId();
        metadataTableName = ENV.getMetadataTableName();
        changeStreamTableName = ENV.createSingersTable();
        changeStreamName = ENV.createChangeStreamFor(changeStreamTableName);
        System.out.println(changeStreamName);
        databaseClient = ENV.getDatabaseClient();
    }

    @Before
    public void before() {
        this.pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class).setStreaming(true);
        this.pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class).setBlockOnRun(false);
    }

    @Test
    public void testReadSpannerChangeStream() {
        Timestamp timestamp = (Timestamp) insertRows(5).getLeft();
        updateRows(5);
        PCollection apply = PCollectionRowTuple.empty(this.pipeline).apply(new SpannerChangestreamsReadSchemaTransformProvider().from(SpannerChangestreamsReadSchemaTransformProvider.SpannerChangestreamsReadConfiguration.builder().setDatabaseId(databaseId).setInstanceId(instanceId).setProjectId(projectId).setTable(changeStreamTableName).setChangeStreamName(changeStreamName).setStartAtTimestamp(timestamp.toString()).setEndAtTimestamp(((Timestamp) deleteRows(5).getRight()).toString()).build()).buildTransform()).get("output").apply(Window.into(new GlobalWindows()).triggering(AfterWatermark.pastEndOfWindow()).discardingFiredPanes());
        Assert.assertEquals(Schema.builder().addStringField("operation").addStringField("commitTimestamp").addInt64Field("recordSequence").addRowField("rowValues", Schema.builder().addNullableField("singerid", Schema.FieldType.INT64).addNullableField("firstname", Schema.FieldType.STRING).addNullableField("lastname", Schema.FieldType.STRING).addNullableField("singerinfo", Schema.FieldType.BYTES).setOptions(Schema.Options.builder().addOptions(Schema.Options.builder().setOption("primaryKeyColumns", Schema.FieldType.array(Schema.FieldType.STRING), Collections.singletonList("singerid")).build()).build()).build()).build(), apply.getSchema());
        PAssert.that(apply.apply(Select.fieldNames(new String[]{"operation"}))).containsInAnyOrder(new Row[]{operationRow("INSERT"), operationRow("INSERT"), operationRow("INSERT"), operationRow("INSERT"), operationRow("INSERT"), operationRow("UPDATE"), operationRow("UPDATE"), operationRow("UPDATE"), operationRow("UPDATE"), operationRow("UPDATE"), operationRow("DELETE"), operationRow("DELETE"), operationRow("DELETE"), operationRow("DELETE"), operationRow("DELETE")});
        this.pipeline.run().waitUntilFinish();
        assertMetadataTableHasBeenDropped();
    }

    private Row operationRow(String str) {
        return Row.withSchema(Schema.builder().addField("operation", Schema.FieldType.STRING).build()).addValue(str).build();
    }

    private static void assertMetadataTableHasBeenDropped() {
        try {
            ResultSet executeQuery = databaseClient.singleUse().executeQuery(Statement.of("SELECT * FROM " + metadataTableName), new Options.QueryOption[0]);
            Throwable th = null;
            try {
                executeQuery.next();
                Assert.fail("The metadata table " + metadataTableName + " should had been dropped, but it still exists");
                if (executeQuery != null) {
                    if (0 != 0) {
                        try {
                            executeQuery.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        executeQuery.close();
                    }
                }
            } finally {
            }
        } catch (SpannerException e) {
            Assert.assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode());
            Assert.assertTrue("Error message must contain \"Table not found\"", e.getMessage().contains("Table not found"));
        }
    }

    private static Pair<Timestamp, Timestamp> insertRows(int i) {
        Timestamp insertRow = insertRow(1);
        for (int i2 = 2; i2 < i; i2++) {
            insertRow(i2);
        }
        return Pair.of(insertRow, insertRow(i));
    }

    private static Pair<Timestamp, Timestamp> updateRows(int i) {
        Timestamp updateRow = updateRow(1);
        for (int i2 = 2; i2 < i; i2++) {
            updateRow(i2);
        }
        return Pair.of(updateRow, updateRow(i));
    }

    private static Pair<Timestamp, Timestamp> deleteRows(int i) {
        Timestamp deleteRow = deleteRow(1);
        for (int i2 = 2; i2 < i; i2++) {
            deleteRow(i2);
        }
        return Pair.of(deleteRow, deleteRow(i));
    }

    private static Timestamp insertRow(int i) {
        return databaseClient.writeWithOptions(Collections.singletonList(((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertBuilder(changeStreamTableName).set("SingerId").to(i)).set("FirstName").to("First Name " + i)).set("LastName").to("Last Name " + i)).build()), new Options.TransactionOption[]{Options.tag("app=beam;action=insert")}).getCommitTimestamp();
    }

    private static Timestamp updateRow(int i) {
        return databaseClient.writeWithOptions(Collections.singletonList(((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newUpdateBuilder(changeStreamTableName).set("SingerId").to(i)).set("FirstName").to("Updated First Name " + i)).set("LastName").to("Updated Last Name " + i)).build()), new Options.TransactionOption[]{Options.tag("app=beam;action=update")}).getCommitTimestamp();
    }

    private static Timestamp deleteRow(int i) {
        return databaseClient.writeWithOptions(Collections.singletonList(Mutation.delete(changeStreamTableName, Key.of(new Object[]{Integer.valueOf(i)}))), new Options.TransactionOption[]{Options.tag("app=beam;action=delete")}).getCommitTimestamp();
    }
}
