package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.gson.Gson;
import java.lang.invoke.SerializedLambda;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.commons.lang3.tuple.Pair;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.class */
public class SpannerChangeStreamIT {

    @ClassRule
    public static final IntegrationTestEnv ENV = new IntegrationTestEnv();

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();

    @Rule
    public ExpectedException exception = ExpectedException.none();
    private static String instanceId;
    private static String projectId;
    private static String databaseId;
    private static String metadataTableName;
    private static String changeStreamTableName;
    private static String changeStreamName;
    private static DatabaseClient databaseClient;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT$ModsToString.class */
    public static class ModsToString extends DoFn<DataChangeRecord, String> {
        private transient Gson gson;

        private ModsToString() {
        }

        @DoFn.Setup
        public void setup() {
            this.gson = new Gson();
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element DataChangeRecord dataChangeRecord, DoFn.OutputReceiver<String> outputReceiver) {
            Mod mod = (Mod) dataChangeRecord.getMods().get(0);
            Map map = (Map) this.gson.fromJson(mod.getKeysJson(), Map.class);
            Map map2 = (Map) Optional.ofNullable(mod.getOldValuesJson()).map(str -> {
                return (Map) this.gson.fromJson(str, Map.class);
            }).orElseGet(Collections::emptyMap);
            Map map3 = (Map) Optional.ofNullable(mod.getNewValuesJson()).map(str2 -> {
                return (Map) this.gson.fromJson(str2, Map.class);
            }).orElseGet(Collections::emptyMap);
            outputReceiver.outputWithTimestamp(String.join(",", dataChangeRecord.getModType().toString(), (CharSequence) map.get("SingerId"), (CharSequence) map2.get("FirstName"), (CharSequence) map2.get("LastName"), (CharSequence) map3.get("FirstName"), (CharSequence) map3.get("LastName")), new Instant(dataChangeRecord.getRecordTimestamp().toSqlTimestamp()));
        }
    }

    @BeforeClass
    public static void beforeClass() throws Exception {
        projectId = ENV.getProjectId();
        instanceId = ENV.getInstanceId();
        databaseId = ENV.getDatabaseId();
        metadataTableName = ENV.getMetadataTableName();
        changeStreamTableName = ENV.createSingersTable();
        changeStreamName = ENV.createChangeStreamFor(changeStreamTableName);
        databaseClient = ENV.getDatabaseClient();
        ENV.createMetadataDatabase();
        ENV.createRoleAndGrantPrivileges(changeStreamTableName, changeStreamName);
    }

    @Before
    public void before() {
        this.pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class).setStreaming(true);
        this.pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class).setBlockOnRun(false);
    }

    @Test
    public void testReadSpannerChangeStream() {
        testReadSpannerChangeStreamImpl(this.pipeline, null);
    }

    @Test
    public void testReadSpannerChangeStreamWithAuthorizedRole() {
        testReadSpannerChangeStreamImpl(this.pipeline, ENV.getDatabaseRole());
    }

    @Test
    public void testReadSpannerChangeStreamWithUnauthorizedRole() {
        Assume.assumeTrue(this.pipeline.getOptions().getRunner() == DirectRunner.class);
        this.exception.expect(SpannerException.class);
        this.exception.expectMessage("Role not found: bad_role.");
        testReadSpannerChangeStreamImpl(this.pipeline.enableAbandonedNodeEnforcement(false), "bad_role");
    }

    public void testReadSpannerChangeStreamImpl(TestPipeline testPipeline, String str) {
        Timestamp timestamp = (Timestamp) insertRows(5).getLeft();
        updateRows(5);
        Timestamp timestamp2 = (Timestamp) deleteRows(5).getRight();
        SpannerConfig withDatabaseId = SpannerConfig.create().withProjectId(projectId).withInstanceId(instanceId).withDatabaseId(databaseId);
        if (str != null) {
            withDatabaseId = withDatabaseId.withDatabaseRole(ValueProvider.StaticValueProvider.of(str));
        }
        PAssert.that(testPipeline.apply(SpannerIO.readChangeStream().withSpannerConfig(withDatabaseId).withChangeStreamName(changeStreamName).withMetadataDatabase(ENV.getMetadataDatabaseId()).withMetadataTable(metadataTableName).withInclusiveStartAt(timestamp).withInclusiveEndAt(timestamp2)).apply(ParDo.of(new ModsToString()))).containsInAnyOrder(new String[]{"INSERT,1,null,null,First Name 1,Last Name 1", "INSERT,2,null,null,First Name 2,Last Name 2", "INSERT,3,null,null,First Name 3,Last Name 3", "INSERT,4,null,null,First Name 4,Last Name 4", "INSERT,5,null,null,First Name 5,Last Name 5", "UPDATE,1,First Name 1,Last Name 1,Updated First Name 1,Updated Last Name 1", "UPDATE,2,First Name 2,Last Name 2,Updated First Name 2,Updated Last Name 2", "UPDATE,3,First Name 3,Last Name 3,Updated First Name 3,Updated Last Name 3", "UPDATE,4,First Name 4,Last Name 4,Updated First Name 4,Updated Last Name 4", "UPDATE,5,First Name 5,Last Name 5,Updated First Name 5,Updated Last Name 5", "DELETE,1,Updated First Name 1,Updated Last Name 1,null,null", "DELETE,2,Updated First Name 2,Updated Last Name 2,null,null", "DELETE,3,Updated First Name 3,Updated Last Name 3,null,null", "DELETE,4,Updated First Name 4,Updated Last Name 4,null,null", "DELETE,5,Updated First Name 5,Updated Last Name 5,null,null"});
        testPipeline.run().waitUntilFinish();
        assertMetadataTableHasBeenDropped();
    }

    @Test
    public void testReadSpannerChangeStreamFilteredByTransactionTag() {
        Timestamp timestamp = (Timestamp) insertRows(5).getLeft();
        updateRows(5);
        PAssert.that(this.pipeline.apply(SpannerIO.readChangeStream().withSpannerConfig(SpannerConfig.create().withProjectId(projectId).withInstanceId(instanceId).withDatabaseId(databaseId)).withChangeStreamName(changeStreamName).withMetadataDatabase(ENV.getMetadataDatabaseId()).withMetadataTable(metadataTableName).withInclusiveStartAt(timestamp).withInclusiveEndAt((Timestamp) deleteRows(5).getRight())).apply(Filter.by(dataChangeRecord -> {
            return Boolean.valueOf(!dataChangeRecord.isSystemTransaction() && dataChangeRecord.getTransactionTag().equalsIgnoreCase("app=beam;action=update"));
        })).apply(ParDo.of(new ModsToString()))).satisfies(iterable -> {
            Set set = (Set) StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toSet());
            Stream of = Stream.of((Object[]) new String[]{"UPDATE,1,First Name 1,Last Name 1,Updated First Name 1,Updated Last Name 1", "UPDATE,2,First Name 2,Last Name 2,Updated First Name 2,Updated Last Name 2", "UPDATE,3,First Name 3,Last Name 3,Updated First Name 3,Updated Last Name 3", "UPDATE,4,First Name 4,Last Name 4,Updated First Name 4,Updated Last Name 4", "UPDATE,5,First Name 5,Last Name 5,Updated First Name 5,Updated Last Name 5"});
            Objects.requireNonNull(set);
            Assert.assertTrue(of.allMatch((v1) -> {
                return r1.contains(v1);
            }));
            Stream of2 = Stream.of((Object[]) new String[]{"INSERT,1,null,null,First Name 1,Last Name 1", "INSERT,2,null,null,First Name 2,Last Name 2", "INSERT,3,null,null,First Name 3,Last Name 3", "INSERT,4,null,null,First Name 4,Last Name 4", "INSERT,5,null,null,First Name 5,Last Name 5", "DELETE,1,Updated First Name 1,Updated Last Name 1,null,null", "DELETE,2,Updated First Name 2,Updated Last Name 2,null,null", "DELETE,3,Updated First Name 3,Updated Last Name 3,null,null", "DELETE,4,Updated First Name 4,Updated Last Name 4,null,null", "DELETE,5,Updated First Name 5,Updated Last Name 5,null,null"});
            Objects.requireNonNull(set);
            Assert.assertTrue(of2.noneMatch((v1) -> {
                return r1.contains(v1);
            }));
            return null;
        });
        this.pipeline.run().waitUntilFinish();
        assertMetadataTableHasBeenDropped();
    }

    private static void assertMetadataTableHasBeenDropped() {
        try {
            ResultSet executeQuery = databaseClient.singleUse().executeQuery(Statement.of("SELECT * FROM " + metadataTableName), new Options.QueryOption[0]);
            Throwable th = null;
            try {
                executeQuery.next();
                Assert.fail("The metadata table " + metadataTableName + " should had been dropped, but it still exists");
                if (executeQuery != null) {
                    if (0 != 0) {
                        try {
                            executeQuery.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        executeQuery.close();
                    }
                }
            } finally {
            }
        } catch (SpannerException e) {
            Assert.assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode());
            Assert.assertTrue("Error message must contain \"Table not found\"", e.getMessage().contains("Table not found"));
        }
    }

    private static Pair<Timestamp, Timestamp> insertRows(int i) {
        Timestamp insertRow = insertRow(1);
        for (int i2 = 2; i2 < i; i2++) {
            insertRow(i2);
        }
        return Pair.of(insertRow, insertRow(i));
    }

    private static Pair<Timestamp, Timestamp> updateRows(int i) {
        Timestamp updateRow = updateRow(1);
        for (int i2 = 2; i2 < i; i2++) {
            updateRow(i2);
        }
        return Pair.of(updateRow, updateRow(i));
    }

    private static Pair<Timestamp, Timestamp> deleteRows(int i) {
        Timestamp deleteRow = deleteRow(1);
        for (int i2 = 2; i2 < i; i2++) {
            deleteRow(i2);
        }
        return Pair.of(deleteRow, deleteRow(i));
    }

    private static Timestamp insertRow(int i) {
        return databaseClient.writeWithOptions(Collections.singletonList(((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertBuilder(changeStreamTableName).set("SingerId").to(i)).set("FirstName").to("First Name " + i)).set("LastName").to("Last Name " + i)).build()), new Options.TransactionOption[]{Options.tag("app=beam;action=insert")}).getCommitTimestamp();
    }

    private static Timestamp updateRow(int i) {
        return databaseClient.writeWithOptions(Collections.singletonList(((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newUpdateBuilder(changeStreamTableName).set("SingerId").to(i)).set("FirstName").to("Updated First Name " + i)).set("LastName").to("Updated Last Name " + i)).build()), new Options.TransactionOption[]{Options.tag("app=beam;action=update")}).getCommitTimestamp();
    }

    private static Timestamp deleteRow(int i) {
        return databaseClient.writeWithOptions(Collections.singletonList(Mutation.delete(changeStreamTableName, Key.of(new Object[]{Integer.valueOf(i)}))), new Options.TransactionOption[]{Options.tag("app=beam;action=delete")}).getCommitTimestamp();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1498589182:
                if (implMethodName.equals("lambda$testReadSpannerChangeStreamFilteredByTransactionTag$2d59d67f$1")) {
                    z = true;
                    break;
                }
                break;
            case 258344298:
                if (implMethodName.equals("lambda$testReadSpannerChangeStreamFilteredByTransactionTag$43268ee4$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    return iterable -> {
                        Set set = (Set) StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toSet());
                        Stream of = Stream.of((Object[]) new String[]{"UPDATE,1,First Name 1,Last Name 1,Updated First Name 1,Updated Last Name 1", "UPDATE,2,First Name 2,Last Name 2,Updated First Name 2,Updated Last Name 2", "UPDATE,3,First Name 3,Last Name 3,Updated First Name 3,Updated Last Name 3", "UPDATE,4,First Name 4,Last Name 4,Updated First Name 4,Updated Last Name 4", "UPDATE,5,First Name 5,Last Name 5,Updated First Name 5,Updated Last Name 5"});
                        Objects.requireNonNull(set);
                        Assert.assertTrue(of.allMatch((v1) -> {
                            return r1.contains(v1);
                        }));
                        Stream of2 = Stream.of((Object[]) new String[]{"INSERT,1,null,null,First Name 1,Last Name 1", "INSERT,2,null,null,First Name 2,Last Name 2", "INSERT,3,null,null,First Name 3,Last Name 3", "INSERT,4,null,null,First Name 4,Last Name 4", "INSERT,5,null,null,First Name 5,Last Name 5", "DELETE,1,Updated First Name 1,Updated Last Name 1,null,null", "DELETE,2,Updated First Name 2,Updated Last Name 2,null,null", "DELETE,3,Updated First Name 3,Updated Last Name 3,null,null", "DELETE,4,Updated First Name 4,Updated Last Name 4,null,null", "DELETE,5,Updated First Name 5,Updated Last Name 5,null,null"});
                        Objects.requireNonNull(set);
                        Assert.assertTrue(of2.noneMatch((v1) -> {
                            return r1.contains(v1);
                        }));
                        return null;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/spanner/changestreams/model/DataChangeRecord;)Ljava/lang/Boolean;")) {
                    return dataChangeRecord -> {
                        return Boolean.valueOf(!dataChangeRecord.isSystemTransaction() && dataChangeRecord.getTransactionTag().equalsIgnoreCase("app=beam;action=update"));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
