/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.it.IntegrationTestEnv;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class SpannerChangeStreamOrderedWithinKeyGloballyIT {
    private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamOrderedWithinKeyGloballyIT.class);
    @ClassRule
    public static final IntegrationTestEnv ENV = new IntegrationTestEnv();
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    private static String projectId;
    private static String instanceId;
    private static String databaseId;
    private static String tableName;
    private static String changeStreamName;
    private static DatabaseClient databaseClient;

    @BeforeClass
    public static void setup() throws InterruptedException, ExecutionException, TimeoutException {
        projectId = ENV.getProjectId();
        instanceId = ENV.getInstanceId();
        databaseId = ENV.getDatabaseId();
        tableName = ENV.createSingersTable();
        changeStreamName = ENV.createChangeStreamFor(tableName);
        databaseClient = ENV.getDatabaseClient();
    }

    @Ignore
    @Test
    public void testOrderedWithinKey() {
        SpannerConfig spannerConfig = SpannerConfig.create().withProjectId(projectId).withInstanceId(instanceId).withDatabaseId(databaseId);
        long timeIncrementInSeconds = 2L;
        ArrayList<Mutation> mutations = new ArrayList<Mutation>();
        mutations.add(SpannerChangeStreamOrderedWithinKeyGloballyIT.insertRecordMutation(0L));
        Timestamp startTimestamp = databaseClient.write(mutations);
        this.writeTransactionsToDatabase();
        try {
            Thread.sleep(2000L);
        }
        catch (InterruptedException e) {
            LOG.error(e.toString(), (Throwable)e);
        }
        this.writeTransactionsToDatabase();
        try {
            Thread.sleep(2000L);
        }
        catch (InterruptedException e) {
            LOG.error(e.toString(), (Throwable)e);
        }
        Timestamp endTimestamp = this.writeTransactionsToDatabase();
        LOG.info("Reading change streams from {} to {}", (Object)startTimestamp.toString(), (Object)endTimestamp.toString());
        PCollection tokens = (PCollection)((PCollection)((PCollection)((PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)SpannerIO.readChangeStream().withSpannerConfig(spannerConfig).withChangeStreamName(changeStreamName).withMetadataDatabase(databaseId).withInclusiveStartAt(startTimestamp).withInclusiveEndAt(endTimestamp))).apply((PTransform)ParDo.of((DoFn)new BreakRecordByModFn()))).apply((PTransform)ParDo.of((DoFn)new KeyByIdFn()))).apply((PTransform)ParDo.of(new KeyValueByCommitTimestampAndTransactionIdFn()))).apply((PTransform)ParDo.of((DoFn)new BufferKeyUntilOutputTimestamp(2L)))).apply((PTransform)ParDo.of((DoFn)new ToStringFn()));
        PAssert.that((PCollection)tokens).containsInAnyOrder((Object[])new String[]{"{\"SingerId\":\"0\"}\n{\"FirstName\":\"Inserting mutation 0\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"1\"}\n{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"2\"}\n{\"FirstName\":\"Inserting mutation 2\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"3\"}\n{\"FirstName\":\"Inserting mutation 3\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"1\"}\n{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"2\"}\n{\"FirstName\":\"Inserting mutation 2\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"3\"}\n{\"FirstName\":\"Inserting mutation 3\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"1\"}\n{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"2\"}\n{\"FirstName\":\"Inserting mutation 2\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"3\"}\n{\"FirstName\":\"Inserting mutation 3\",\"LastName\":null,\"SingerInfo\":null};Deleted record;"});
        this.pipeline.runWithAdditionalOptionArgs(Collections.singletonList("--streaming")).waitUntilFinish();
    }

    private Timestamp writeTransactionsToDatabase() {
        ArrayList<Mutation> mutations = new ArrayList<Mutation>();
        mutations.add(SpannerChangeStreamOrderedWithinKeyGloballyIT.insertRecordMutation(1L));
        mutations.add(SpannerChangeStreamOrderedWithinKeyGloballyIT.insertRecordMutation(2L));
        Timestamp t1 = databaseClient.write(mutations);
        LOG.info("The first transaction committed with timestamp: " + t1.toString());
        mutations.clear();
        mutations.add(SpannerChangeStreamOrderedWithinKeyGloballyIT.insertRecordMutation(3L));
        mutations.add(SpannerChangeStreamOrderedWithinKeyGloballyIT.deleteRecordMutation(1L));
        Timestamp t2 = databaseClient.write(mutations);
        LOG.info("The second transaction committed with timestamp: " + t2.toString());
        mutations.clear();
        mutations.add(SpannerChangeStreamOrderedWithinKeyGloballyIT.deleteRecordMutation(2L));
        mutations.add(SpannerChangeStreamOrderedWithinKeyGloballyIT.deleteRecordMutation(3L));
        Timestamp t3 = databaseClient.write(mutations);
        LOG.info("The third transaction committed with timestamp: " + t3.toString());
        mutations.clear();
        mutations.add(SpannerChangeStreamOrderedWithinKeyGloballyIT.deleteRecordMutation(0L));
        Timestamp t4 = databaseClient.write(mutations);
        LOG.info("The fourth transaction committed with timestamp: " + t4.toString());
        return t4;
    }

    private static Mutation insertRecordMutation(long singerId) {
        return ((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertBuilder((String)tableName).set("SingerId").to(singerId)).set("FirstName").to("Inserting mutation " + singerId)).build();
    }

    private static Mutation deleteRecordMutation(long singerId) {
        return Mutation.delete((String)tableName, (KeySet)KeySet.newBuilder().addKey(Key.of((Object[])new Object[]{singerId})).build());
    }

    public static class SortKey
    implements Serializable,
    Comparable<SortKey> {
        private static final long serialVersionUID = 6923689796764239980L;
        private Timestamp commitTimestamp;
        private String transactionId;

        public SortKey() {
        }

        public SortKey(Timestamp commitTimestamp, String transactionId) {
            this.commitTimestamp = commitTimestamp;
            this.transactionId = transactionId;
        }

        public Timestamp getCommitTimestamp() {
            return this.commitTimestamp;
        }

        public void setCommitTimestamp(Timestamp commitTimestamp) {
            this.commitTimestamp = commitTimestamp;
        }

        public String getTransactionId() {
            return this.transactionId;
        }

        public void setTransactionId(String transactionId) {
            this.transactionId = transactionId;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SortKey sortKey = (SortKey)o;
            return Objects.equals(this.commitTimestamp, sortKey.commitTimestamp) && Objects.equals(this.transactionId, sortKey.transactionId);
        }

        public int hashCode() {
            return Objects.hash(this.commitTimestamp, this.transactionId);
        }

        @Override
        public int compareTo(SortKey other) {
            return Comparator.comparingDouble(sortKey -> (double)sortKey.getCommitTimestamp().getSeconds() + (double)sortKey.getCommitTimestamp().getNanos() / 1.0E9).thenComparing(sortKey -> sortKey.getTransactionId()).compare(this, other);
        }
    }

    private static class ToStringFn
    extends DoFn<KV<String, Iterable<KV<SortKey, DataChangeRecord>>>, String> {
        private static final long serialVersionUID = -2573561902102768101L;

        private ToStringFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<String, Iterable<KV<SortKey, DataChangeRecord>>> recordsByKey, DoFn.OutputReceiver<String> outputReceiver) {
            List sortedRecords = StreamSupport.stream(((Iterable)recordsByKey.getValue()).spliterator(), false).sorted(Comparator.comparing(KV::getKey)).collect(Collectors.toList());
            StringBuilder builder = new StringBuilder();
            builder.append((String)recordsByKey.getKey());
            builder.append("\n");
            for (KV record : sortedRecords) {
                builder.append(((Mod)((DataChangeRecord)record.getValue()).getMods().get(0)).getNewValuesJson().isEmpty() ? "Deleted record;" : ((Mod)((DataChangeRecord)record.getValue()).getMods().get(0)).getNewValuesJson() + ";");
            }
            outputReceiver.output((Object)builder.toString());
        }
    }

    private static class BufferKeyUntilOutputTimestamp
    extends DoFn<KV<String, KV<SortKey, DataChangeRecord>>, KV<String, Iterable<KV<SortKey, DataChangeRecord>>>> {
        private static final long serialVersionUID = 5050535558953049259L;
        private final long incrementIntervalInSeconds;
        @DoFn.TimerId(value="timer")
        private final TimerSpec timerSpec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);
        @DoFn.StateId(value="buffer")
        private final StateSpec<BagState<KV<SortKey, DataChangeRecord>>> buffer = StateSpecs.bag();
        @DoFn.StateId(value="seenKey")
        private final StateSpec<ValueState<String>> seenKey = StateSpecs.value((Coder)StringUtf8Coder.of());

        private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
            this.incrementIntervalInSeconds = incrementIntervalInSeconds;
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element KV<String, KV<SortKey, DataChangeRecord>> element, @DoFn.StateId(value="buffer") BagState<KV<SortKey, DataChangeRecord>> buffer, @DoFn.TimerId(value="timer") Timer timer, @DoFn.StateId(value="seenKey") ValueState<String> seenKey) {
            buffer.add((Object)((KV)element.getValue()));
            String hasKeyBeenSeen = (String)seenKey.read();
            if (hasKeyBeenSeen == null) {
                Instant commitTimestamp = new Instant((Object)((DataChangeRecord)((KV)element.getValue()).getValue()).getCommitTimestamp().toSqlTimestamp());
                Instant outputTimestamp = commitTimestamp.plus((ReadableDuration)Duration.standardSeconds((long)this.incrementIntervalInSeconds));
                LOG.info("Setting timer at {} for key {}", (Object)outputTimestamp.toString(), element.getKey());
                timer.set(outputTimestamp);
                seenKey.write((Object)((String)element.getKey()));
            }
        }

        @DoFn.OnTimer(value="timer")
        public void onExpiry(DoFn.OnTimerContext context, @DoFn.StateId(value="buffer") BagState<KV<SortKey, DataChangeRecord>> buffer, @DoFn.TimerId(value="timer") Timer timer, @DoFn.StateId(value="seenKey") ValueState<String> seenKey) {
            String keyForTimer = (String)seenKey.read();
            Instant timerContextTimestamp = context.timestamp();
            LOG.info("Timer reached expiration time for key {} and for timestamp {}", (Object)keyForTimer, (Object)timerContextTimestamp);
            if (!((Boolean)buffer.isEmpty().read()).booleanValue()) {
                List records = StreamSupport.stream(buffer.read().spliterator(), false).collect(Collectors.toList());
                buffer.clear();
                ArrayList<KV> recordsToOutput = new ArrayList<KV>();
                for (KV record : records) {
                    String recordString;
                    Instant recordCommitTimestamp = new Instant((Object)((DataChangeRecord)record.getValue()).getCommitTimestamp().toSqlTimestamp());
                    String string = recordString = ((Mod)((DataChangeRecord)record.getValue()).getMods().get(0)).getNewValuesJson().isEmpty() ? "Deleted record" : ((Mod)((DataChangeRecord)record.getValue()).getMods().get(0)).getNewValuesJson();
                    if (recordCommitTimestamp.isBefore((ReadableInstant)timerContextTimestamp)) {
                        LOG.info("Outputting record with key {} and value \"{}\" at expiration timestamp {}", new Object[]{((Mod)((DataChangeRecord)record.getValue()).getMods().get(0)).getKeysJson(), recordString, timerContextTimestamp.toString()});
                        recordsToOutput.add(record);
                        continue;
                    }
                    LOG.info("Expired at {} but adding record with key {} and value {} back to buffer due to commit timestamp {}", new Object[]{timerContextTimestamp.toString(), ((Mod)((DataChangeRecord)record.getValue()).getMods().get(0)).getKeysJson(), recordString, recordCommitTimestamp.toString()});
                    buffer.add((Object)record);
                }
                if (!recordsToOutput.isEmpty()) {
                    context.outputWithTimestamp((Object)KV.of((Object)((Mod)((DataChangeRecord)((KV)recordsToOutput.get(0)).getValue()).getMods().get(0)).getKeysJson(), recordsToOutput), timerContextTimestamp);
                    LOG.info("Expired at {}, outputting records for key and context timestamp {}", (Object)timerContextTimestamp.toString(), (Object)((Mod)((DataChangeRecord)((KV)recordsToOutput.get(0)).getValue()).getMods().get(0)).getKeysJson());
                } else {
                    LOG.info("Expired at {} with no records", (Object)timerContextTimestamp.toString());
                }
            }
            Instant nextTimer = timerContextTimestamp.plus((ReadableDuration)Duration.standardSeconds((long)this.incrementIntervalInSeconds));
            if (buffer.isEmpty() != null && !((Boolean)buffer.isEmpty().read()).booleanValue()) {
                LOG.info("Setting next timer to {} for key {}", (Object)nextTimer.toString(), (Object)keyForTimer);
                timer.set(nextTimer);
            } else {
                LOG.info("Timer not being set since the buffer is empty for key {} ", (Object)keyForTimer);
                seenKey.clear();
            }
        }
    }

    private static class KeyByIdFn
    extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
        private static final long serialVersionUID = 5121794565566403405L;

        private KeyByIdFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element DataChangeRecord record, DoFn.OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
            outputReceiver.output((Object)KV.of((Object)((Mod)record.getMods().get(0)).getKeysJson(), (Object)record));
        }
    }

    private static class KeyValueByCommitTimestampAndTransactionIdFn<K>
    extends DoFn<KV<K, DataChangeRecord>, KV<K, KV<SortKey, DataChangeRecord>>> {
        private static final long serialVersionUID = -4059137464869088056L;

        private KeyValueByCommitTimestampAndTransactionIdFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<K, DataChangeRecord> recordByKey, DoFn.OutputReceiver<KV<K, KV<SortKey, DataChangeRecord>>> outputReceiver) {
            Object key = recordByKey.getKey();
            DataChangeRecord record = (DataChangeRecord)recordByKey.getValue();
            outputReceiver.output((Object)KV.of((Object)key, (Object)KV.of((Object)new SortKey(record.getCommitTimestamp(), record.getServerTransactionId()), (Object)record)));
        }
    }

    private static class BreakRecordByModFn
    extends DoFn<DataChangeRecord, DataChangeRecord> {
        private static final long serialVersionUID = 543765692722095666L;

        private BreakRecordByModFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element DataChangeRecord record, DoFn.OutputReceiver<DataChangeRecord> outputReceiver) {
            record.getMods().stream().map(mod -> new DataChangeRecord(record.getPartitionToken(), record.getCommitTimestamp(), record.getServerTransactionId(), record.isLastRecordInTransactionInPartition(), record.getRecordSequence(), record.getTableName(), record.getRowType(), Collections.singletonList(mod), record.getModType(), record.getValueCaptureType(), record.getNumberOfRecordsInTransaction(), record.getNumberOfPartitionsInTransaction(), record.getMetadata())).forEach(arg_0 -> outputReceiver.output(arg_0));
        }
    }
}

