package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT.class */
public class SpannerChangeStreamOrderedWithinKeyGloballyIT {
    private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamOrderedWithinKeyGloballyIT.class);

    @ClassRule
    public static final IntegrationTestEnv ENV = new IntegrationTestEnv();

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    private static String projectId;
    private static String instanceId;
    private static String databaseId;
    private static String tableName;
    private static String changeStreamName;
    private static DatabaseClient databaseClient;

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT$BreakRecordByModFn.class */
    private static class BreakRecordByModFn extends DoFn<DataChangeRecord, DataChangeRecord> {
        private static final long serialVersionUID = 543765692722095666L;

        private BreakRecordByModFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element DataChangeRecord dataChangeRecord, DoFn.OutputReceiver<DataChangeRecord> outputReceiver) {
            Stream map = dataChangeRecord.getMods().stream().map(mod -> {
                return new DataChangeRecord(dataChangeRecord.getPartitionToken(), dataChangeRecord.getCommitTimestamp(), dataChangeRecord.getServerTransactionId(), dataChangeRecord.isLastRecordInTransactionInPartition(), dataChangeRecord.getRecordSequence(), dataChangeRecord.getTableName(), dataChangeRecord.getRowType(), Collections.singletonList(mod), dataChangeRecord.getModType(), dataChangeRecord.getValueCaptureType(), dataChangeRecord.getNumberOfRecordsInTransaction(), dataChangeRecord.getNumberOfPartitionsInTransaction(), dataChangeRecord.getMetadata());
            });
            Objects.requireNonNull(outputReceiver);
            map.forEach((v1) -> {
                r1.output(v1);
            });
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT$BufferKeyUntilOutputTimestamp.class */
    private static class BufferKeyUntilOutputTimestamp extends DoFn<KV<String, KV<SortKey, DataChangeRecord>>, KV<String, Iterable<KV<SortKey, DataChangeRecord>>>> {
        private static final long serialVersionUID = 5050535558953049259L;
        private final long incrementIntervalInSeconds;

        @DoFn.TimerId("timer")
        private final TimerSpec timerSpec;

        @DoFn.StateId("buffer")
        private final StateSpec<BagState<KV<SortKey, DataChangeRecord>>> buffer;

        @DoFn.StateId("seenKey")
        private final StateSpec<ValueState<String>> seenKey;

        private BufferKeyUntilOutputTimestamp(long j) {
            this.timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
            this.buffer = StateSpecs.bag();
            this.seenKey = StateSpecs.value(StringUtf8Coder.of());
            this.incrementIntervalInSeconds = j;
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element KV<String, KV<SortKey, DataChangeRecord>> kv, @DoFn.StateId("buffer") BagState<KV<SortKey, DataChangeRecord>> bagState, @DoFn.TimerId("timer") Timer timer, @DoFn.StateId("seenKey") ValueState<String> valueState) {
            bagState.add((KV) kv.getValue());
            if (((String) valueState.read()) == null) {
                Instant plus = new Instant(((DataChangeRecord) ((KV) kv.getValue()).getValue()).getCommitTimestamp().toSqlTimestamp()).plus(Duration.standardSeconds(this.incrementIntervalInSeconds));
                SpannerChangeStreamOrderedWithinKeyGloballyIT.LOG.info("Setting timer at {} for key {}", plus.toString(), kv.getKey());
                timer.set(plus);
                valueState.write((String) kv.getKey());
            }
        }

        @DoFn.OnTimer("timer")
        public void onExpiry(DoFn<KV<String, KV<SortKey, DataChangeRecord>>, KV<String, Iterable<KV<SortKey, DataChangeRecord>>>>.OnTimerContext onTimerContext, @DoFn.StateId("buffer") BagState<KV<SortKey, DataChangeRecord>> bagState, @DoFn.TimerId("timer") Timer timer, @DoFn.StateId("seenKey") ValueState<String> valueState) {
            String str = (String) valueState.read();
            Instant timestamp = onTimerContext.timestamp();
            SpannerChangeStreamOrderedWithinKeyGloballyIT.LOG.info("Timer reached expiration time for key {} and for timestamp {}", str, timestamp);
            if (!((Boolean) bagState.isEmpty().read()).booleanValue()) {
                List<KV> list = (List) StreamSupport.stream(bagState.read().spliterator(), false).collect(Collectors.toList());
                bagState.clear();
                ArrayList arrayList = new ArrayList();
                for (KV kv : list) {
                    Instant instant = new Instant(((DataChangeRecord) kv.getValue()).getCommitTimestamp().toSqlTimestamp());
                    String newValuesJson = ((Mod) ((DataChangeRecord) kv.getValue()).getMods().get(0)).getNewValuesJson().isEmpty() ? "Deleted record" : ((Mod) ((DataChangeRecord) kv.getValue()).getMods().get(0)).getNewValuesJson();
                    if (instant.isBefore(timestamp)) {
                        SpannerChangeStreamOrderedWithinKeyGloballyIT.LOG.info("Outputting record with key {} and value \"{}\" at expiration timestamp {}", new Object[]{((Mod) ((DataChangeRecord) kv.getValue()).getMods().get(0)).getKeysJson(), newValuesJson, timestamp.toString()});
                        arrayList.add(kv);
                    } else {
                        SpannerChangeStreamOrderedWithinKeyGloballyIT.LOG.info("Expired at {} but adding record with key {} and value {} back to buffer due to commit timestamp {}", new Object[]{timestamp.toString(), ((Mod) ((DataChangeRecord) kv.getValue()).getMods().get(0)).getKeysJson(), newValuesJson, instant.toString()});
                        bagState.add(kv);
                    }
                }
                if (arrayList.isEmpty()) {
                    SpannerChangeStreamOrderedWithinKeyGloballyIT.LOG.info("Expired at {} with no records", timestamp.toString());
                } else {
                    onTimerContext.outputWithTimestamp(KV.of(((Mod) ((DataChangeRecord) ((KV) arrayList.get(0)).getValue()).getMods().get(0)).getKeysJson(), arrayList), timestamp);
                    SpannerChangeStreamOrderedWithinKeyGloballyIT.LOG.info("Expired at {}, outputting records for key and context timestamp {}", timestamp.toString(), ((Mod) ((DataChangeRecord) ((KV) arrayList.get(0)).getValue()).getMods().get(0)).getKeysJson());
                }
            }
            Instant plus = timestamp.plus(Duration.standardSeconds(this.incrementIntervalInSeconds));
            if (bagState.isEmpty() == null || ((Boolean) bagState.isEmpty().read()).booleanValue()) {
                SpannerChangeStreamOrderedWithinKeyGloballyIT.LOG.info("Timer not being set since the buffer is empty for key {} ", str);
                valueState.clear();
            } else {
                SpannerChangeStreamOrderedWithinKeyGloballyIT.LOG.info("Setting next timer to {} for key {}", plus.toString(), str);
                timer.set(plus);
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT$KeyByIdFn.class */
    private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>> {
        private static final long serialVersionUID = 5121794565566403405L;

        private KeyByIdFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element DataChangeRecord dataChangeRecord, DoFn.OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
            outputReceiver.output(KV.of(((Mod) dataChangeRecord.getMods().get(0)).getKeysJson(), dataChangeRecord));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT$KeyValueByCommitTimestampAndTransactionIdFn.class */
    private static class KeyValueByCommitTimestampAndTransactionIdFn<K> extends DoFn<KV<K, DataChangeRecord>, KV<K, KV<SortKey, DataChangeRecord>>> {
        private static final long serialVersionUID = -4059137464869088056L;

        private KeyValueByCommitTimestampAndTransactionIdFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<K, DataChangeRecord> kv, DoFn.OutputReceiver<KV<K, KV<SortKey, DataChangeRecord>>> outputReceiver) {
            Object key = kv.getKey();
            DataChangeRecord dataChangeRecord = (DataChangeRecord) kv.getValue();
            outputReceiver.output(KV.of(key, KV.of(new SortKey(dataChangeRecord.getCommitTimestamp(), dataChangeRecord.getServerTransactionId()), dataChangeRecord)));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT$SortKey.class */
    public static class SortKey implements Serializable, Comparable<SortKey> {
        private static final long serialVersionUID = 6923689796764239980L;
        private Timestamp commitTimestamp;
        private String transactionId;

        public SortKey() {
        }

        public SortKey(Timestamp timestamp, String str) {
            this.commitTimestamp = timestamp;
            this.transactionId = str;
        }

        public Timestamp getCommitTimestamp() {
            return this.commitTimestamp;
        }

        public void setCommitTimestamp(Timestamp timestamp) {
            this.commitTimestamp = timestamp;
        }

        public String getTransactionId() {
            return this.transactionId;
        }

        public void setTransactionId(String str) {
            this.transactionId = str;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            SortKey sortKey = (SortKey) obj;
            return Objects.equals(this.commitTimestamp, sortKey.commitTimestamp) && Objects.equals(this.transactionId, sortKey.transactionId);
        }

        public int hashCode() {
            return Objects.hash(this.commitTimestamp, this.transactionId);
        }

        @Override // java.lang.Comparable
        public int compareTo(SortKey sortKey) {
            return Comparator.comparingDouble(sortKey2 -> {
                return sortKey2.getCommitTimestamp().getSeconds() + (sortKey2.getCommitTimestamp().getNanos() / 1.0E9d);
            }).thenComparing(sortKey3 -> {
                return sortKey3.getTransactionId();
            }).compare(this, sortKey);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedWithinKeyGloballyIT$ToStringFn.class */
    private static class ToStringFn extends DoFn<KV<String, Iterable<KV<SortKey, DataChangeRecord>>>, String> {
        private static final long serialVersionUID = -2573561902102768101L;

        private ToStringFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<String, Iterable<KV<SortKey, DataChangeRecord>>> kv, DoFn.OutputReceiver<String> outputReceiver) {
            List<KV> list = (List) StreamSupport.stream(((Iterable) kv.getValue()).spliterator(), false).sorted(Comparator.comparing((v0) -> {
                return v0.getKey();
            })).collect(Collectors.toList());
            StringBuilder sb = new StringBuilder();
            sb.append((String) kv.getKey());
            sb.append("\n");
            for (KV kv2 : list) {
                sb.append(((Mod) ((DataChangeRecord) kv2.getValue()).getMods().get(0)).getNewValuesJson().isEmpty() ? "Deleted record;" : ((Mod) ((DataChangeRecord) kv2.getValue()).getMods().get(0)).getNewValuesJson() + ";");
            }
            outputReceiver.output(sb.toString());
        }
    }

    @BeforeClass
    public static void setup() throws InterruptedException, ExecutionException, TimeoutException {
        projectId = ENV.getProjectId();
        instanceId = ENV.getInstanceId();
        databaseId = ENV.getDatabaseId();
        tableName = ENV.createSingersTable();
        changeStreamName = ENV.createChangeStreamFor(tableName);
        databaseClient = ENV.getDatabaseClient();
    }

    @Test
    @Ignore
    public void testOrderedWithinKey() {
        SpannerConfig withDatabaseId = SpannerConfig.create().withProjectId(projectId).withInstanceId(instanceId).withDatabaseId(databaseId);
        ArrayList arrayList = new ArrayList();
        arrayList.add(insertRecordMutation(0L));
        Timestamp write = databaseClient.write(arrayList);
        writeTransactionsToDatabase();
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            LOG.error(e.toString(), e);
        }
        writeTransactionsToDatabase();
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e2) {
            LOG.error(e2.toString(), e2);
        }
        Timestamp writeTransactionsToDatabase = writeTransactionsToDatabase();
        LOG.info("Reading change streams from {} to {}", write.toString(), writeTransactionsToDatabase.toString());
        PAssert.that(this.pipeline.apply(SpannerIO.readChangeStream().withSpannerConfig(withDatabaseId).withChangeStreamName(changeStreamName).withMetadataDatabase(databaseId).withInclusiveStartAt(write).withInclusiveEndAt(writeTransactionsToDatabase)).apply(ParDo.of(new BreakRecordByModFn())).apply(ParDo.of(new KeyByIdFn())).apply(ParDo.of(new KeyValueByCommitTimestampAndTransactionIdFn())).apply(ParDo.of(new BufferKeyUntilOutputTimestamp(2L))).apply(ParDo.of(new ToStringFn()))).containsInAnyOrder(new String[]{"{\"SingerId\":\"0\"}\n{\"FirstName\":\"Inserting mutation 0\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"1\"}\n{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"2\"}\n{\"FirstName\":\"Inserting mutation 2\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"3\"}\n{\"FirstName\":\"Inserting mutation 3\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"1\"}\n{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"2\"}\n{\"FirstName\":\"Inserting mutation 2\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"3\"}\n{\"FirstName\":\"Inserting mutation 3\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"1\"}\n{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"2\"}\n{\"FirstName\":\"Inserting mutation 2\",\"LastName\":null,\"SingerInfo\":null};Deleted record;", "{\"SingerId\":\"3\"}\n{\"FirstName\":\"Inserting mutation 3\",\"LastName\":null,\"SingerInfo\":null};Deleted record;"});
        this.pipeline.runWithAdditionalOptionArgs(Collections.singletonList("--streaming")).waitUntilFinish();
    }

    private Timestamp writeTransactionsToDatabase() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(insertRecordMutation(1L));
        arrayList.add(insertRecordMutation(2L));
        LOG.info("The first transaction committed with timestamp: " + databaseClient.write(arrayList).toString());
        arrayList.clear();
        arrayList.add(insertRecordMutation(3L));
        arrayList.add(deleteRecordMutation(1L));
        LOG.info("The second transaction committed with timestamp: " + databaseClient.write(arrayList).toString());
        arrayList.clear();
        arrayList.add(deleteRecordMutation(2L));
        arrayList.add(deleteRecordMutation(3L));
        LOG.info("The third transaction committed with timestamp: " + databaseClient.write(arrayList).toString());
        arrayList.clear();
        arrayList.add(deleteRecordMutation(0L));
        Timestamp write = databaseClient.write(arrayList);
        LOG.info("The fourth transaction committed with timestamp: " + write.toString());
        return write;
    }

    private static Mutation insertRecordMutation(long j) {
        return ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertBuilder(tableName).set("SingerId").to(j)).set("FirstName").to("Inserting mutation " + j)).build();
    }

    private static Mutation deleteRecordMutation(long j) {
        return Mutation.delete(tableName, KeySet.newBuilder().addKey(Key.of(new Object[]{Long.valueOf(j)})).build());
    }
}
