/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.it.IntegrationTestEnv;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Instant;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class SpannerChangeStreamTransactionBoundariesIT {
    private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamTransactionBoundariesIT.class);
    @ClassRule
    public static final IntegrationTestEnv ENV = new IntegrationTestEnv();
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    private static String projectId;
    private static String instanceId;
    private static String databaseId;
    private static String tableName;
    private static String changeStreamName;
    private static DatabaseClient databaseClient;

    @BeforeClass
    public static void setup() throws InterruptedException, ExecutionException, TimeoutException {
        projectId = ENV.getProjectId();
        instanceId = ENV.getInstanceId();
        databaseId = ENV.getDatabaseId();
        tableName = ENV.createSingersTable();
        changeStreamName = ENV.createChangeStreamFor(tableName);
        databaseClient = ENV.getDatabaseClient();
    }

    @Test
    public void testTransactionBoundaries() {
        LOG.info("Test pipeline: " + this.pipeline.toString());
        SpannerConfig spannerConfig = SpannerConfig.create().withProjectId(projectId).withInstanceId(instanceId).withDatabaseId(databaseId);
        ArrayList<Mutation> mutations = new ArrayList<Mutation>();
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.insertRecordMutation(0L, "FirstName0", "LastName0"));
        Timestamp startTimestamp = databaseClient.write(mutations);
        Timestamp endTimestamp = this.writeTransactionsToDatabase();
        PCollection tokens = (PCollection)((PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)SpannerIO.readChangeStream().withSpannerConfig(spannerConfig).withChangeStreamName(changeStreamName).withMetadataDatabase(databaseId).withInclusiveStartAt(startTimestamp).withInclusiveEndAt(endTimestamp))).apply((PTransform)ParDo.of((DoFn)new KeyByTransactionIdFn()))).apply((PTransform)ParDo.of((DoFn)new TransactionBoundaryFn()))).apply((PTransform)ParDo.of((DoFn)new ToStringFn()));
        PAssert.that((PCollection)tokens).containsInAnyOrder((Object[])new String[]{"{\"SingerId\":\"0\"},INSERT\n", "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n", "{\"SingerId\":\"1\"},DELETE\n{\"SingerId\":\"3\"},INSERT\n", "{\"SingerId\":\"4\"}{\"SingerId\":\"5\"}{\"SingerId\":\"6\"},INSERT\n", "{\"SingerId\":\"6\"},UPDATE\n{\"SingerId\":\"7\"},INSERT\n", "{\"SingerId\":\"4\"}{\"SingerId\":\"5\"},UPDATE\n", "{\"SingerId\":\"3\"}{\"SingerId\":\"4\"}{\"SingerId\":\"5\"},DELETE\n", "{\"SingerId\":\"0\"}{\"SingerId\":\"2\"}{\"SingerId\":\"6\"}{\"SingerId\":\"7\"},DELETE\n"});
        PipelineResult pipelineResult = this.pipeline.run();
        pipelineResult.waitUntilFinish();
    }

    private Timestamp writeTransactionsToDatabase() {
        ArrayList<Mutation> mutations = new ArrayList<Mutation>();
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.insertRecordMutation(1L, "FirstName1", "LastName2"));
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.insertRecordMutation(2L, "FirstName2", "LastName2"));
        Timestamp t1 = databaseClient.write(mutations);
        LOG.debug("The first transaction committed with timestamp: " + t1.toString());
        mutations.clear();
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.insertRecordMutation(3L, "FirstName3", "LastName3"));
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.deleteRecordMutation(1L));
        Timestamp t2 = databaseClient.write(mutations);
        LOG.debug("The second transaction committed with timestamp: " + t2.toString());
        mutations.clear();
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.insertRecordMutation(4L, "FirstName4", "LastName4"));
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.insertRecordMutation(5L, "FirstName5", "LastName5"));
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.insertRecordMutation(6L, "FirstName6", "LastName6"));
        Timestamp t3 = databaseClient.write(mutations);
        LOG.debug("The third transaction committed with timestamp: " + t3.toString());
        mutations.clear();
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.insertRecordMutation(7L, "FirstName7", "LastName7"));
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.updateRecordMutation(6L, "FirstName5", "LastName5"));
        Timestamp t4 = databaseClient.write(mutations);
        LOG.debug("The fourth transaction committed with timestamp: " + t4.toString());
        mutations.clear();
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.updateRecordMutation(4L, "FirstName9", "LastName9"));
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.updateRecordMutation(5L, "FirstName9", "LastName9"));
        Timestamp t5 = databaseClient.write(mutations);
        LOG.debug("The fifth transaction committed with timestamp: " + t5.toString());
        mutations.clear();
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.deleteRecordMutation(3L));
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.deleteRecordMutation(4L));
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.deleteRecordMutation(5L));
        Timestamp t6 = databaseClient.write(mutations);
        mutations.clear();
        LOG.debug("The sixth transaction committed with timestamp: " + t6.toString());
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.deleteRecordMutation(0L));
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.deleteRecordMutation(2L));
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.deleteRecordMutation(6L));
        mutations.add(SpannerChangeStreamTransactionBoundariesIT.deleteRecordMutation(7L));
        Timestamp t7 = databaseClient.write(mutations);
        LOG.debug("The seventh transaction committed with timestamp: " + t7.toString());
        return t7;
    }

    private static Mutation updateRecordMutation(long singerId, String firstName, String lastName) {
        return ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)tableName).set("SingerId").to(singerId)).set("FirstName").to(firstName)).set("LastName").to(lastName)).build();
    }

    private static Mutation insertRecordMutation(long singerId, String firstName, String lastName) {
        return ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertBuilder((String)tableName).set("SingerId").to(singerId)).set("FirstName").to(firstName)).set("LastName").to(lastName)).build();
    }

    private static Mutation deleteRecordMutation(long singerId) {
        return Mutation.delete((String)tableName, (KeySet)KeySet.newBuilder().addKey(Key.of((Object[])new Object[]{singerId})).build());
    }

    private static class SortKey
    implements Serializable,
    Comparable<SortKey> {
        private static final long serialVersionUID = 2105939115467195036L;
        private Timestamp commitTimestamp;
        private String transactionId;

        public SortKey() {
        }

        public SortKey(Timestamp commitTimestamp, String transactionId) {
            this.commitTimestamp = commitTimestamp;
            this.transactionId = transactionId;
        }

        public static long getSerialVersionUID() {
            return 2105939115467195036L;
        }

        public Timestamp getCommitTimestamp() {
            return this.commitTimestamp;
        }

        public void setCommitTimestamp(Timestamp commitTimestamp) {
            this.commitTimestamp = commitTimestamp;
        }

        public String getTransactionId() {
            return this.transactionId;
        }

        public void setTransactionId(String transactionId) {
            this.transactionId = transactionId;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SortKey sortKey = (SortKey)o;
            return Objects.equals(this.commitTimestamp, sortKey.commitTimestamp) && Objects.equals(this.transactionId, sortKey.transactionId);
        }

        public int hashCode() {
            return Objects.hash(this.commitTimestamp, this.transactionId);
        }

        @Override
        public int compareTo(SortKey other) {
            return Comparator.comparingDouble(sortKey -> (double)sortKey.getCommitTimestamp().getSeconds() + (double)sortKey.getCommitTimestamp().getNanos() / 1.0E9).thenComparing(sortKey -> sortKey.getTransactionId()).compare(this, other);
        }
    }

    private static class ToStringFn
    extends DoFn<KV<SortKey, Iterable<DataChangeRecord>>, String> {
        private static final long serialVersionUID = 2307936669684679038L;

        private ToStringFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<SortKey, Iterable<DataChangeRecord>> element, DoFn.OutputReceiver<String> outputReceiver) {
            StringBuilder builder = new StringBuilder();
            Iterable sortedRecords = (Iterable)element.getValue();
            sortedRecords.forEach(record -> {
                String modString = "";
                for (Mod mod : record.getMods()) {
                    modString = modString + mod.getKeysJson();
                }
                builder.append(String.join((CharSequence)",", modString, record.getModType().toString()));
                builder.append("\n");
            });
            outputReceiver.output((Object)builder.toString());
        }
    }

    private static class TransactionBoundaryFn
    extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>> {
        private static final long serialVersionUID = 5050535558953049259L;
        @DoFn.StateId(value="buffer")
        private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();
        @DoFn.StateId(value="count")
        private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

        private TransactionBoundaryFn() {
        }

        @DoFn.ProcessElement
        public void process(DoFn.ProcessContext context, @DoFn.StateId(value="buffer") BagState<DataChangeRecord> buffer, @DoFn.StateId(value="count") ValueState<Integer> countState) {
            KV element = (KV)context.element();
            DataChangeRecord record = (DataChangeRecord)element.getValue();
            buffer.add((Object)record);
            int count = countState.read() != null ? (Integer)countState.read() : 0;
            countState.write((Object)(++count));
            if ((long)count == record.getNumberOfRecordsInTransaction()) {
                List sortedRecords = StreamSupport.stream(buffer.read().spliterator(), false).sorted(Comparator.comparing(DataChangeRecord::getRecordSequence)).collect(Collectors.toList());
                Instant commitInstant = new Instant(((DataChangeRecord)sortedRecords.get(0)).getCommitTimestamp().toSqlTimestamp().getTime());
                context.outputWithTimestamp((Object)KV.of((Object)new SortKey(((DataChangeRecord)sortedRecords.get(0)).getCommitTimestamp(), ((DataChangeRecord)sortedRecords.get(0)).getServerTransactionId()), sortedRecords), commitInstant);
                buffer.clear();
                countState.clear();
            }
        }
    }

    private static class KeyByTransactionIdFn
    extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
        private static final long serialVersionUID = 1270485392415293532L;

        private KeyByTransactionIdFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element DataChangeRecord record, DoFn.OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
            outputReceiver.output((Object)KV.of((Object)record.getServerTransactionId(), (Object)record));
        }
    }
}

