/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.gson.Gson;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.it.ChangeStreamTestPipelineOptions;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.it.IntegrationTestEnv;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.tuple.Pair;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class SpannerChangeStreamIT {
    @ClassRule
    public static final IntegrationTestEnv ENV = new IntegrationTestEnv();
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    private static String instanceId;
    private static String projectId;
    private static String databaseId;
    private static String metadataTableName;
    private static String changeStreamTableName;
    private static String changeStreamName;
    private static DatabaseClient databaseClient;

    @BeforeClass
    public static void beforeClass() throws Exception {
        projectId = ENV.getProjectId();
        instanceId = ENV.getInstanceId();
        databaseId = ENV.getDatabaseId();
        metadataTableName = ENV.getMetadataTableName();
        changeStreamTableName = ENV.createSingersTable();
        changeStreamName = ENV.createChangeStreamFor(changeStreamTableName);
        databaseClient = ENV.getDatabaseClient();
    }

    @Before
    public void before() {
        ((ChangeStreamTestPipelineOptions)this.pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class)).setStreaming(true);
        ((ChangeStreamTestPipelineOptions)this.pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class)).setBlockOnRun(false);
    }

    @Test
    public void testReadSpannerChangeStream() {
        int numRows = 5;
        Pair<Timestamp, Timestamp> insertTimestamps = SpannerChangeStreamIT.insertRows(5);
        Timestamp startAt = (Timestamp)insertTimestamps.getLeft();
        SpannerChangeStreamIT.updateRows(5);
        Pair<Timestamp, Timestamp> deleteTimestamps = SpannerChangeStreamIT.deleteRows(5);
        Timestamp endAt = (Timestamp)deleteTimestamps.getRight();
        SpannerConfig spannerConfig = SpannerConfig.create().withProjectId(projectId).withInstanceId(instanceId).withDatabaseId(databaseId);
        PCollection tokens = (PCollection)((PCollection)this.pipeline.apply((PTransform)SpannerIO.readChangeStream().withSpannerConfig(spannerConfig).withChangeStreamName(changeStreamName).withMetadataDatabase(databaseId).withMetadataTable(metadataTableName).withInclusiveStartAt(startAt).withInclusiveEndAt(endAt))).apply((PTransform)ParDo.of((DoFn)new ModsToString()));
        PAssert.that((PCollection)tokens).containsInAnyOrder((Object[])new String[]{"INSERT,1,null,null,First Name 1,Last Name 1", "INSERT,2,null,null,First Name 2,Last Name 2", "INSERT,3,null,null,First Name 3,Last Name 3", "INSERT,4,null,null,First Name 4,Last Name 4", "INSERT,5,null,null,First Name 5,Last Name 5", "UPDATE,1,First Name 1,Last Name 1,Updated First Name 1,Updated Last Name 1", "UPDATE,2,First Name 2,Last Name 2,Updated First Name 2,Updated Last Name 2", "UPDATE,3,First Name 3,Last Name 3,Updated First Name 3,Updated Last Name 3", "UPDATE,4,First Name 4,Last Name 4,Updated First Name 4,Updated Last Name 4", "UPDATE,5,First Name 5,Last Name 5,Updated First Name 5,Updated Last Name 5", "DELETE,1,Updated First Name 1,Updated Last Name 1,null,null", "DELETE,2,Updated First Name 2,Updated Last Name 2,null,null", "DELETE,3,Updated First Name 3,Updated Last Name 3,null,null", "DELETE,4,Updated First Name 4,Updated Last Name 4,null,null", "DELETE,5,Updated First Name 5,Updated Last Name 5,null,null"});
        this.pipeline.run().waitUntilFinish();
        SpannerChangeStreamIT.assertMetadataTableHasBeenDropped();
    }

    private static void assertMetadataTableHasBeenDropped() {
        try (ResultSet resultSet = databaseClient.singleUse().executeQuery(Statement.of((String)("SELECT * FROM " + metadataTableName)), new Options.QueryOption[0]);){
            resultSet.next();
            Assert.fail((String)("The metadata table " + metadataTableName + " should had been dropped, but it still exists"));
        }
        catch (SpannerException e) {
            Assert.assertEquals((Object)ErrorCode.INVALID_ARGUMENT, (Object)e.getErrorCode());
            Assert.assertTrue((String)"Error message must contain \"Table not found\"", (boolean)e.getMessage().contains("Table not found"));
        }
    }

    private static Pair<Timestamp, Timestamp> insertRows(int n) {
        Timestamp firstCommitTimestamp = SpannerChangeStreamIT.insertRow(1);
        for (int i = 2; i < n; ++i) {
            SpannerChangeStreamIT.insertRow(i);
        }
        Timestamp lastCommitTimestamp = SpannerChangeStreamIT.insertRow(n);
        return Pair.of((Object)firstCommitTimestamp, (Object)lastCommitTimestamp);
    }

    private static Pair<Timestamp, Timestamp> updateRows(int n) {
        Timestamp firstCommitTimestamp = SpannerChangeStreamIT.updateRow(1);
        for (int i = 2; i < n; ++i) {
            SpannerChangeStreamIT.updateRow(i);
        }
        Timestamp lastCommitTimestamp = SpannerChangeStreamIT.updateRow(n);
        return Pair.of((Object)firstCommitTimestamp, (Object)lastCommitTimestamp);
    }

    private static Pair<Timestamp, Timestamp> deleteRows(int n) {
        Timestamp firstCommitTimestamp = SpannerChangeStreamIT.deleteRow(1);
        for (int i = 2; i < n; ++i) {
            SpannerChangeStreamIT.deleteRow(i);
        }
        Timestamp lastCommitTimestamp = SpannerChangeStreamIT.deleteRow(n);
        return Pair.of((Object)firstCommitTimestamp, (Object)lastCommitTimestamp);
    }

    private static Timestamp insertRow(int singerId) {
        return databaseClient.write(Collections.singletonList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertBuilder((String)changeStreamTableName).set("SingerId").to((long)singerId)).set("FirstName").to("First Name " + singerId)).set("LastName").to("Last Name " + singerId)).build()));
    }

    private static Timestamp updateRow(int singerId) {
        return databaseClient.write(Collections.singletonList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)changeStreamTableName).set("SingerId").to((long)singerId)).set("FirstName").to("Updated First Name " + singerId)).set("LastName").to("Updated Last Name " + singerId)).build()));
    }

    private static Timestamp deleteRow(int singerId) {
        return databaseClient.write(Collections.singletonList(Mutation.delete((String)changeStreamTableName, (Key)Key.of((Object[])new Object[]{singerId}))));
    }

    private static class ModsToString
    extends DoFn<DataChangeRecord, String> {
        private transient Gson gson;

        private ModsToString() {
        }

        @DoFn.Setup
        public void setup() {
            this.gson = new Gson();
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element DataChangeRecord record, DoFn.OutputReceiver<String> outputReceiver) {
            Mod mod = (Mod)record.getMods().get(0);
            Map keys = (Map)this.gson.fromJson(mod.getKeysJson(), Map.class);
            Map oldValues = Optional.ofNullable(mod.getOldValuesJson()).map(nonNullValues -> (Map)this.gson.fromJson(nonNullValues, Map.class)).orElseGet(Collections::emptyMap);
            Map newValues = Optional.ofNullable(mod.getNewValuesJson()).map(nonNullValues -> (Map)this.gson.fromJson(nonNullValues, Map.class)).orElseGet(Collections::emptyMap);
            String modsAsString = String.join((CharSequence)",", record.getModType().toString(), (CharSequence)keys.get("SingerId"), (CharSequence)oldValues.get("FirstName"), (CharSequence)oldValues.get("LastName"), (CharSequence)newValues.get("FirstName"), (CharSequence)newValues.get("LastName"));
            Instant timestamp = new Instant((Object)record.getRecordTimestamp().toSqlTimestamp());
            outputReceiver.outputWithTimestamp((Object)modsAsString, timestamp);
        }
    }
}

