/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.it.IntegrationTestEnv;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class SpannerChangeStreamOrderedByTimestampAndTransactionIdIT {
    private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.class);
    @ClassRule
    public static final IntegrationTestEnv ENV = new IntegrationTestEnv();
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    private static String projectId;
    private static String instanceId;
    private static String databaseId;
    private static String tableName;
    private static String changeStreamName;
    private static DatabaseClient databaseClient;

    @BeforeClass
    public static void setup() throws InterruptedException, ExecutionException, TimeoutException {
        projectId = ENV.getProjectId();
        instanceId = ENV.getInstanceId();
        databaseId = ENV.getDatabaseId();
        tableName = ENV.createSingersTable();
        changeStreamName = ENV.createChangeStreamFor(tableName);
        databaseClient = ENV.getDatabaseClient();
    }

    @Test
    public void testTransactionBoundaries() {
        SpannerConfig spannerConfig = SpannerConfig.create().withProjectId(projectId).withInstanceId(instanceId).withDatabaseId(databaseId);
        ArrayList<Mutation> mutations = new ArrayList<Mutation>();
        mutations.add(SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.insertRecordMutation(0L, "FirstName0", "LastName0"));
        long timeIncrementInSeconds = 2L;
        Timestamp startTimestamp = databaseClient.write(mutations);
        this.writeTransactionsToDatabase();
        try {
            Thread.sleep(2000L);
        }
        catch (InterruptedException e) {
            LOG.error(e.toString(), (Throwable)e);
        }
        this.writeTransactionsToDatabase();
        try {
            Thread.sleep(2000L);
        }
        catch (InterruptedException e) {
            LOG.error(e.toString(), (Throwable)e);
        }
        Timestamp endTimestamp = this.writeTransactionsToDatabase();
        PCollection tokens = (PCollection)((PCollection)((PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)SpannerIO.readChangeStream().withSpannerConfig(spannerConfig).withChangeStreamName(changeStreamName).withMetadataDatabase(databaseId).withInclusiveStartAt(startTimestamp).withInclusiveEndAt(endTimestamp))).apply((PTransform)ParDo.of((DoFn)new KeyBySortKeyFn()))).apply((PTransform)ParDo.of((DoFn)new CreateArtificialKeyFn()))).apply((PTransform)ParDo.of((DoFn)new BufferRecordsUntilOutputTimestamp(endTimestamp, 2L)))).apply((PTransform)ParDo.of((DoFn)new ToStringFn()));
        PAssert.that((PCollection)tokens).containsInAnyOrder((Object[])new String[]{"{\"SingerId\":\"0\"},INSERT\n{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n{\"SingerId\":\"1\"},DELETE\n{\"SingerId\":\"3\"},INSERT\n{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n{\"SingerId\":\"0\"},DELETE\n", "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n{\"SingerId\":\"1\"},DELETE\n{\"SingerId\":\"3\"},INSERT\n{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n", "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n{\"SingerId\":\"1\"},DELETE\n{\"SingerId\":\"3\"},INSERT\n{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n"});
        this.pipeline.runWithAdditionalOptionArgs(Collections.singletonList("--streaming")).waitUntilFinish();
    }

    private static String getRecordString(DataChangeRecord record) {
        StringBuilder builder = new StringBuilder();
        String modString = "";
        for (Mod mod : record.getMods()) {
            modString = modString + mod.getKeysJson();
        }
        builder.append(String.join((CharSequence)",", modString, record.getModType().toString()));
        builder.append("\n");
        return builder.toString();
    }

    private Timestamp writeTransactionsToDatabase() {
        ArrayList<Mutation> mutations = new ArrayList<Mutation>();
        mutations.add(SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.insertRecordMutation(1L, "FirstName1", "LastName2"));
        mutations.add(SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.insertRecordMutation(2L, "FirstName2", "LastName2"));
        Timestamp t1 = databaseClient.write(mutations);
        LOG.debug("The first transaction committed with timestamp: " + t1.toString());
        mutations.clear();
        mutations.add(SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.insertRecordMutation(3L, "FirstName3", "LastName3"));
        mutations.add(SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.deleteRecordMutation(1L));
        Timestamp t2 = databaseClient.write(mutations);
        LOG.debug("The second transaction committed with timestamp: " + t2.toString());
        mutations.clear();
        mutations.add(SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.deleteRecordMutation(2L));
        mutations.add(SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.deleteRecordMutation(3L));
        Timestamp t3 = databaseClient.write(mutations);
        LOG.debug("The third transaction committed with timestamp: " + t3.toString());
        mutations.clear();
        mutations.add(SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.deleteRecordMutation(0L));
        Timestamp t4 = databaseClient.write(mutations);
        LOG.debug("The fourth transaction committed with timestamp: " + t4.toString());
        return t4;
    }

    private static Mutation insertRecordMutation(long singerId, String firstName, String lastName) {
        return ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertBuilder((String)tableName).set("SingerId").to(singerId)).set("FirstName").to(firstName)).set("LastName").to(lastName)).build();
    }

    private static Mutation deleteRecordMutation(long singerId) {
        return Mutation.delete((String)tableName, (KeySet)KeySet.newBuilder().addKey(Key.of((Object[])new Object[]{singerId})).build());
    }

    private static class SortKey
    implements Serializable,
    Comparable<SortKey> {
        private static final long serialVersionUID = 2105939115467195036L;
        private Timestamp commitTimestamp;
        private String transactionId;

        public SortKey() {
        }

        public SortKey(Timestamp commitTimestamp, String transactionId) {
            this.commitTimestamp = commitTimestamp;
            this.transactionId = transactionId;
        }

        public static long getSerialVersionUID() {
            return 2105939115467195036L;
        }

        public Timestamp getCommitTimestamp() {
            return this.commitTimestamp;
        }

        public void setCommitTimestamp(Timestamp commitTimestamp) {
            this.commitTimestamp = commitTimestamp;
        }

        public String getTransactionId() {
            return this.transactionId;
        }

        public void setTransactionId(String transactionId) {
            this.transactionId = transactionId;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SortKey sortKey = (SortKey)o;
            return Objects.equals(this.commitTimestamp, sortKey.commitTimestamp) && Objects.equals(this.transactionId, sortKey.transactionId);
        }

        public int hashCode() {
            return Objects.hash(this.commitTimestamp, this.transactionId);
        }

        @Override
        public int compareTo(SortKey other) {
            return Comparator.comparingDouble(sortKey -> (double)sortKey.getCommitTimestamp().getSeconds() + (double)sortKey.getCommitTimestamp().getNanos() / 1.0E9).thenComparing(sortKey -> sortKey.getTransactionId()).compare(this, other);
        }
    }

    private static class ToStringFn
    extends DoFn<Iterable<KV<SortKey, DataChangeRecord>>, String> {
        private static final long serialVersionUID = 2307936669684679038L;

        private ToStringFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element Iterable<KV<SortKey, DataChangeRecord>> element, DoFn.OutputReceiver<String> outputReceiver) {
            StringBuilder builder = new StringBuilder();
            List<KV> sortedTransactions = StreamSupport.stream(element.spliterator(), false).sorted((kv1, kv2) -> ((SortKey)kv1.getKey()).compareTo((SortKey)kv2.getKey())).collect(Collectors.toList());
            sortedTransactions.forEach(record -> builder.append(SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.getRecordString((DataChangeRecord)record.getValue())));
            outputReceiver.output((Object)builder.toString());
        }
    }

    private static class BufferRecordsUntilOutputTimestamp
    extends DoFn<KV<byte[], KV<SortKey, DataChangeRecord>>, Iterable<KV<SortKey, DataChangeRecord>>> {
        private static final long serialVersionUID = 5050535558953049259L;
        private final long incrementIntervalInSeconds;
        @Nullable
        private final Instant pipelineEndTime;
        @DoFn.TimerId(value="timer")
        private final TimerSpec timerSpec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);
        @DoFn.StateId(value="buffer")
        private final StateSpec<BagState<KV<SortKey, DataChangeRecord>>> buffer = StateSpecs.bag();
        @DoFn.StateId(value="keySeen")
        private final StateSpec<ValueState<Boolean>> keySeen = StateSpecs.value((Coder)BooleanCoder.of());

        private BufferRecordsUntilOutputTimestamp(@Nullable Timestamp endTimestamp, long incrementIntervalInSeconds) {
            this.incrementIntervalInSeconds = incrementIntervalInSeconds;
            this.pipelineEndTime = endTimestamp != null ? new Instant((Object)endTimestamp.toSqlTimestamp()) : null;
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element KV<byte[], KV<SortKey, DataChangeRecord>> element, @DoFn.StateId(value="buffer") BagState<KV<SortKey, DataChangeRecord>> buffer, @DoFn.TimerId(value="timer") Timer timer, @DoFn.StateId(value="keySeen") ValueState<Boolean> keySeen) {
            buffer.add((Object)((KV)element.getValue()));
            Boolean hasKeyBeenSeen = (Boolean)keySeen.read();
            if (hasKeyBeenSeen == null) {
                Instant commitTimestamp = new Instant((Object)((SortKey)((KV)element.getValue()).getKey()).getCommitTimestamp().toSqlTimestamp());
                Instant outputTimestamp = commitTimestamp.plus((ReadableDuration)Duration.standardSeconds((long)this.incrementIntervalInSeconds));
                LOG.debug("Setting timer at {} for key {}", (Object)outputTimestamp.toString(), element.getKey());
                timer.set(outputTimestamp);
                keySeen.write((Object)true);
            }
        }

        @DoFn.OnTimer(value="timer")
        public void onExpiry(DoFn.OnTimerContext context, @DoFn.StateId(value="buffer") BagState<KV<SortKey, DataChangeRecord>> buffer, @DoFn.TimerId(value="timer") Timer timer) {
            if (!((Boolean)buffer.isEmpty().read()).booleanValue()) {
                List records = StreamSupport.stream(buffer.read().spliterator(), false).collect(Collectors.toList());
                buffer.clear();
                ArrayList<KV> recordsToOutput = new ArrayList<KV>();
                for (KV record : records) {
                    Instant recordCommitTimestamp = new Instant((Object)((SortKey)record.getKey()).getCommitTimestamp().toSqlTimestamp());
                    String recordString = SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.getRecordString((DataChangeRecord)record.getValue());
                    if (recordCommitTimestamp.isBefore((ReadableInstant)context.timestamp())) {
                        LOG.debug("Outputting transactions {} with id {} at expiration timestamp {}", new Object[]{recordString, ((SortKey)record.getKey()).toString(), context.timestamp().toString()});
                        recordsToOutput.add(record);
                        continue;
                    }
                    LOG.debug("Expired at {} but adding transaction {} back to buffer due to commit timestamp {}", new Object[]{context.timestamp().toString(), recordString, recordCommitTimestamp.toString()});
                    buffer.add((Object)record);
                }
                if (!recordsToOutput.isEmpty()) {
                    context.outputWithTimestamp(recordsToOutput, context.timestamp());
                    LOG.debug("Expired at {}, outputting records for key {}", (Object)context.timestamp().toString(), (Object)((SortKey)((KV)recordsToOutput.get(0)).getKey()).toString());
                } else {
                    LOG.debug("Expired at {} with no records", (Object)context.timestamp().toString());
                }
            }
            Instant nextTimer = context.timestamp().plus((ReadableDuration)Duration.standardSeconds((long)this.incrementIntervalInSeconds));
            if (this.pipelineEndTime == null || context.timestamp().isBefore((ReadableInstant)this.pipelineEndTime)) {
                LOG.debug("Setting next timer to {}", (Object)nextTimer.toString());
                timer.set(nextTimer);
            } else {
                LOG.debug("Timer not being set as exceeded pipeline end time: " + this.pipelineEndTime.toString());
            }
        }
    }

    private static class CreateArtificialKeyFn
    extends DoFn<KV<SortKey, DataChangeRecord>, KV<byte[], KV<SortKey, DataChangeRecord>>> {
        private static final long serialVersionUID = -3363057370822294686L;

        private CreateArtificialKeyFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<SortKey, DataChangeRecord> element, DoFn.OutputReceiver<KV<byte[], KV<SortKey, DataChangeRecord>>> outputReceiver) {
            outputReceiver.output((Object)KV.of((Object)new byte[0], element));
        }
    }

    private static class KeyBySortKeyFn
    extends DoFn<DataChangeRecord, KV<SortKey, DataChangeRecord>> {
        private static final long serialVersionUID = 1270485392415293532L;

        private KeyBySortKeyFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element DataChangeRecord record, DoFn.OutputReceiver<KV<SortKey, DataChangeRecord>> outputReceiver) {
            outputReceiver.output((Object)KV.of((Object)new SortKey(record.getCommitTimestamp(), record.getServerTransactionId()), (Object)record));
        }
    }
}

