/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.it.IntegrationTestEnv;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecordMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class SpannerChangeStreamOrderedWithinKeyIT {
    private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamOrderedWithinKeyIT.class);
    @ClassRule
    public static final IntegrationTestEnv ENV = new IntegrationTestEnv();
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    private static String projectId;
    private static String instanceId;
    private static String databaseId;
    private static String tableName;
    private static String changeStreamName;
    private static DatabaseClient databaseClient;

    @BeforeClass
    public static void setup() throws InterruptedException, ExecutionException, TimeoutException {
        projectId = ENV.getProjectId();
        instanceId = ENV.getInstanceId();
        databaseId = ENV.getDatabaseId();
        tableName = ENV.createSingersTable();
        changeStreamName = ENV.createChangeStreamFor(tableName);
        databaseClient = ENV.getDatabaseClient();
    }

    @Ignore
    @Test
    public void testOrderedWithinKey() {
        LOG.info("Test pipeline: " + this.pipeline.toString());
        SpannerConfig spannerConfig = SpannerConfig.create().withProjectId(projectId).withInstanceId(instanceId).withDatabaseId(databaseId);
        ArrayList<Mutation> mutations = new ArrayList<Mutation>();
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.insertRecordMutation(0L));
        Timestamp startTimestamp = databaseClient.write(mutations);
        Timestamp endTimestamp = SpannerChangeStreamOrderedWithinKeyIT.writeTransactionsToDatabase();
        PCollection tokens = (PCollection)((PCollection)((PCollection)((PCollection)((PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)SpannerIO.readChangeStream().withSpannerConfig(spannerConfig).withChangeStreamName(changeStreamName).withMetadataDatabase(databaseId).withInclusiveStartAt(startTimestamp).withInclusiveEndAt(endTimestamp))).apply((PTransform)ParDo.of((DoFn)new BreakRecordByModFn()))).apply((PTransform)ParDo.of((DoFn)new KeyByIdFn()))).apply((PTransform)ParDo.of(new KeyValueByCommitTimestampAndRecordSequenceFn()))).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)2L))))).apply((PTransform)GroupByKey.create())).apply((PTransform)ParDo.of((DoFn)new ToStringFn()));
        PAssert.that((PCollection)tokens).containsInAnyOrder((Object[])new String[]{"{\"SingerId\":\"0\"}\n{\"FirstName\":\"Inserting mutation 0\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"1\"}\n{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};{\"FirstName\":\"Updating mutation 1\"};Deleted record;{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"2\"}\n{\"FirstName\":\"Inserting mutation 2\",\"LastName\":null,\"SingerInfo\":null};{\"FirstName\":\"Updating mutation 2\"};Deleted record;", "{\"SingerId\":\"3\"}\n{\"FirstName\":\"Inserting mutation 3\",\"LastName\":null,\"SingerInfo\":null};{\"FirstName\":\"Updating mutation 3\"};Deleted record;", "{\"SingerId\":\"4\"}\n{\"FirstName\":\"Inserting mutation 4\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"5\"}\n{\"FirstName\":\"Updating mutation 5\",\"LastName\":null,\"SingerInfo\":null};{\"FirstName\":\"Updating mutation 5\"};Deleted record;"});
        this.pipeline.run().waitUntilFinish();
    }

    private static Timestamp writeTransactionsToDatabase() {
        ArrayList<Mutation> mutations = new ArrayList<Mutation>();
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.insertRecordMutation(1L));
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.insertRecordMutation(2L));
        Timestamp t1 = databaseClient.write(mutations);
        LOG.debug("The first transaction committed with timestamp: " + t1.toString());
        mutations.clear();
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.updateRecordMutation(1L));
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.insertRecordMutation(4L));
        Timestamp t2 = databaseClient.write(mutations);
        LOG.debug("The second transaction committed with timestamp: " + t2.toString());
        mutations.clear();
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.deleteRecordMutation(1L));
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.insertRecordMutation(3L));
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.insertRecordMutation(5L));
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.updateRecordMutation(5L));
        Timestamp t3 = databaseClient.write(mutations);
        LOG.debug("The third transaction committed with timestamp: " + t3.toString());
        mutations.clear();
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.updateRecordMutation(3L));
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.updateRecordMutation(2L));
        Timestamp t4 = databaseClient.write(mutations);
        LOG.debug("The fourth transaction committed with timestamp: " + t4.toString());
        mutations.clear();
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.deleteRecordMutation(4L));
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.insertRecordMutation(1L));
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.deleteRecordMutation(3L));
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.updateRecordMutation(5L));
        Timestamp t5 = databaseClient.write(mutations);
        LOG.debug("The fifth transaction committed with timestamp: " + t5.toString());
        mutations.clear();
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.deleteRecordMutation(5L));
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.insertRecordMutation(6L));
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.deleteRecordMutation(6L));
        Timestamp t6 = databaseClient.write(mutations);
        LOG.debug("The sixth transaction committed with timestamp: " + t6.toString());
        mutations.clear();
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.deleteRecordMutation(1L));
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.deleteRecordMutation(2L));
        mutations.add(SpannerChangeStreamOrderedWithinKeyIT.deleteRecordMutation(0L));
        Timestamp t7 = databaseClient.write(mutations);
        LOG.debug("The seventh transaction committed with timestamp: " + t7.toString());
        return t7;
    }

    private static Mutation updateRecordMutation(long singerId) {
        return ((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)tableName).set("SingerId").to(singerId)).set("FirstName").to("Updating mutation " + singerId)).build();
    }

    private static Mutation insertRecordMutation(long singerId) {
        return ((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertBuilder((String)tableName).set("SingerId").to(singerId)).set("FirstName").to("Inserting mutation " + singerId)).build();
    }

    private static Mutation deleteRecordMutation(long singerId) {
        return Mutation.delete((String)tableName, (KeySet)KeySet.newBuilder().addKey(Key.of((Object[])new Object[]{singerId})).build());
    }

    public static class SortKey
    implements Serializable,
    Comparable<SortKey> {
        private static final long serialVersionUID = 6923689796764239980L;
        private Timestamp commitTimestamp;
        private String transactionId;

        public SortKey() {
        }

        public SortKey(Timestamp commitTimestamp, String transactionId) {
            this.commitTimestamp = commitTimestamp;
            this.transactionId = transactionId;
        }

        public Timestamp getCommitTimestamp() {
            return this.commitTimestamp;
        }

        public void setCommitTimestamp(Timestamp commitTimestamp) {
            this.commitTimestamp = commitTimestamp;
        }

        public String getTransactionId() {
            return this.transactionId;
        }

        public void setTransactionId(String transactionId) {
            this.transactionId = transactionId;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SortKey sortKey = (SortKey)o;
            return Objects.equals(this.commitTimestamp, sortKey.commitTimestamp) && Objects.equals(this.transactionId, sortKey.transactionId);
        }

        public int hashCode() {
            return Objects.hash(this.commitTimestamp, this.transactionId);
        }

        @Override
        public int compareTo(SortKey other) {
            return Comparator.comparingDouble(sortKey -> (double)sortKey.getCommitTimestamp().getSeconds() + (double)sortKey.getCommitTimestamp().getNanos() / 1.0E9).thenComparing(sortKey -> sortKey.getTransactionId()).compare(this, other);
        }
    }

    private static class ToStringFn
    extends DoFn<KV<String, Iterable<KV<SortKey, DataChangeRecord>>>, String> {
        private static final long serialVersionUID = -2573561902102768101L;

        private ToStringFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<String, Iterable<KV<SortKey, DataChangeRecord>>> recordsByKey, DoFn.OutputReceiver<String> outputReceiver) {
            List sortedRecords = StreamSupport.stream(((Iterable)recordsByKey.getValue()).spliterator(), false).sorted(Comparator.comparing(KV::getKey)).collect(Collectors.toList());
            StringBuilder builder = new StringBuilder();
            builder.append((String)recordsByKey.getKey());
            builder.append("\n");
            for (KV record : sortedRecords) {
                builder.append(((Mod)((DataChangeRecord)record.getValue()).getMods().get(0)).getNewValuesJson().isEmpty() ? "Deleted record;" : ((Mod)((DataChangeRecord)record.getValue()).getMods().get(0)).getNewValuesJson() + ";");
            }
            outputReceiver.output((Object)builder.toString());
        }
    }

    private static class KeyValueByCommitTimestampAndRecordSequenceFn<K>
    extends DoFn<KV<K, DataChangeRecord>, KV<K, KV<SortKey, DataChangeRecord>>> {
        private static final long serialVersionUID = -4059137464869088056L;

        private KeyValueByCommitTimestampAndRecordSequenceFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<K, DataChangeRecord> recordByKey, DoFn.OutputReceiver<KV<K, KV<SortKey, DataChangeRecord>>> outputReceiver) {
            Object key = recordByKey.getKey();
            DataChangeRecord record = (DataChangeRecord)recordByKey.getValue();
            outputReceiver.output((Object)KV.of((Object)key, (Object)KV.of((Object)new SortKey(record.getCommitTimestamp(), record.getServerTransactionId()), (Object)record)));
        }
    }

    private static class KeyByIdFn
    extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
        private static final long serialVersionUID = 5121794565566403405L;

        private KeyByIdFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element DataChangeRecord record, DoFn.OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
            outputReceiver.output((Object)KV.of((Object)((Mod)record.getMods().get(0)).getKeysJson(), (Object)record));
        }
    }

    private static class BreakRecordByModFn
    extends DoFn<DataChangeRecord, DataChangeRecord> {
        private static final long serialVersionUID = 543765692722095666L;

        private BreakRecordByModFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element DataChangeRecord record, DoFn.OutputReceiver<DataChangeRecord> outputReceiver) {
            ChangeStreamRecordMetadata fakeChangeStreamMetadata = ChangeStreamRecordMetadata.newBuilder().withPartitionToken("1").withRecordTimestamp(Timestamp.ofTimeMicroseconds((long)2L)).withPartitionStartTimestamp(Timestamp.ofTimeMicroseconds((long)3L)).withPartitionEndTimestamp(Timestamp.ofTimeMicroseconds((long)4L)).withPartitionCreatedAt(Timestamp.ofTimeMicroseconds((long)5L)).withPartitionScheduledAt(Timestamp.ofTimeMicroseconds((long)6L)).withPartitionRunningAt(Timestamp.ofTimeMicroseconds((long)7L)).withQueryStartedAt(Timestamp.ofTimeMicroseconds((long)8L)).withRecordStreamStartedAt(Timestamp.ofTimeMicroseconds((long)9L)).withRecordStreamEndedAt(Timestamp.ofTimeMicroseconds((long)10L)).withRecordReadAt(Timestamp.ofTimeMicroseconds((long)11L)).withTotalStreamTimeMillis(12L).withNumberOfRecordsRead(13L).build();
            record.getMods().stream().map(mod -> new DataChangeRecord(record.getPartitionToken(), record.getCommitTimestamp(), record.getServerTransactionId(), record.isLastRecordInTransactionInPartition(), record.getRecordSequence(), record.getTableName(), record.getRowType(), Collections.singletonList(mod), record.getModType(), record.getValueCaptureType(), record.getNumberOfRecordsInTransaction(), record.getNumberOfPartitionsInTransaction(), fakeChangeStreamMetadata)).forEach(arg_0 -> outputReceiver.output(arg_0));
        }
    }
}

