package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.class */
public class SpannerChangeStreamOrderedByTimestampAndTransactionIdIT {
    private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.class);

    @ClassRule
    public static final IntegrationTestEnv ENV = new IntegrationTestEnv();

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    private static String projectId;
    private static String instanceId;
    private static String databaseId;
    private static String tableName;
    private static String changeStreamName;
    private static DatabaseClient databaseClient;

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT$BufferRecordsUntilOutputTimestamp.class */
    private static class BufferRecordsUntilOutputTimestamp extends DoFn<KV<byte[], KV<SortKey, DataChangeRecord>>, Iterable<KV<SortKey, DataChangeRecord>>> {
        private static final long serialVersionUID = 5050535558953049259L;
        private final long incrementIntervalInSeconds;

        @Nullable
        private final Instant pipelineEndTime;

        @DoFn.TimerId("timer")
        private final TimerSpec timerSpec;

        @DoFn.StateId("buffer")
        private final StateSpec<BagState<KV<SortKey, DataChangeRecord>>> buffer;

        @DoFn.StateId("keySeen")
        private final StateSpec<ValueState<Boolean>> keySeen;

        private BufferRecordsUntilOutputTimestamp(@Nullable Timestamp timestamp, long j) {
            this.timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
            this.buffer = StateSpecs.bag();
            this.keySeen = StateSpecs.value(BooleanCoder.of());
            this.incrementIntervalInSeconds = j;
            if (timestamp != null) {
                this.pipelineEndTime = new Instant(timestamp.toSqlTimestamp());
            } else {
                this.pipelineEndTime = null;
            }
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element KV<byte[], KV<SortKey, DataChangeRecord>> kv, @DoFn.StateId("buffer") BagState<KV<SortKey, DataChangeRecord>> bagState, @DoFn.TimerId("timer") Timer timer, @DoFn.StateId("keySeen") ValueState<Boolean> valueState) {
            bagState.add((KV) kv.getValue());
            if (((Boolean) valueState.read()) == null) {
                Instant plus = new Instant(((SortKey) ((KV) kv.getValue()).getKey()).getCommitTimestamp().toSqlTimestamp()).plus(Duration.standardSeconds(this.incrementIntervalInSeconds));
                SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.LOG.debug("Setting timer at {} for key {}", plus.toString(), kv.getKey());
                timer.set(plus);
                valueState.write(true);
            }
        }

        @DoFn.OnTimer("timer")
        public void onExpiry(DoFn<KV<byte[], KV<SortKey, DataChangeRecord>>, Iterable<KV<SortKey, DataChangeRecord>>>.OnTimerContext onTimerContext, @DoFn.StateId("buffer") BagState<KV<SortKey, DataChangeRecord>> bagState, @DoFn.TimerId("timer") Timer timer) {
            if (!((Boolean) bagState.isEmpty().read()).booleanValue()) {
                List<KV> list = (List) StreamSupport.stream(bagState.read().spliterator(), false).collect(Collectors.toList());
                bagState.clear();
                ArrayList arrayList = new ArrayList();
                for (KV kv : list) {
                    Instant instant = new Instant(((SortKey) kv.getKey()).getCommitTimestamp().toSqlTimestamp());
                    String recordString = SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.getRecordString((DataChangeRecord) kv.getValue());
                    if (instant.isBefore(onTimerContext.timestamp())) {
                        SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.LOG.debug("Outputting transactions {} with id {} at expiration timestamp {}", new Object[]{recordString, ((SortKey) kv.getKey()).toString(), onTimerContext.timestamp().toString()});
                        arrayList.add(kv);
                    } else {
                        SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.LOG.debug("Expired at {} but adding transaction {} back to buffer due to commit timestamp {}", new Object[]{onTimerContext.timestamp().toString(), recordString, instant.toString()});
                        bagState.add(kv);
                    }
                }
                if (arrayList.isEmpty()) {
                    SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.LOG.debug("Expired at {} with no records", onTimerContext.timestamp().toString());
                } else {
                    onTimerContext.outputWithTimestamp(arrayList, onTimerContext.timestamp());
                    SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.LOG.debug("Expired at {}, outputting records for key {}", onTimerContext.timestamp().toString(), ((SortKey) ((KV) arrayList.get(0)).getKey()).toString());
                }
            }
            Instant plus = onTimerContext.timestamp().plus(Duration.standardSeconds(this.incrementIntervalInSeconds));
            if (this.pipelineEndTime == null || onTimerContext.timestamp().isBefore(this.pipelineEndTime)) {
                SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.LOG.debug("Setting next timer to {}", plus.toString());
                timer.set(plus);
            } else {
                SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.LOG.debug("Timer not being set as exceeded pipeline end time: " + this.pipelineEndTime.toString());
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT$CreateArtificialKeyFn.class */
    private static class CreateArtificialKeyFn extends DoFn<KV<SortKey, DataChangeRecord>, KV<byte[], KV<SortKey, DataChangeRecord>>> {
        private static final long serialVersionUID = -3363057370822294686L;

        private CreateArtificialKeyFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<SortKey, DataChangeRecord> kv, DoFn.OutputReceiver<KV<byte[], KV<SortKey, DataChangeRecord>>> outputReceiver) {
            outputReceiver.output(KV.of(new byte[0], kv));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT$KeyBySortKeyFn.class */
    private static class KeyBySortKeyFn extends DoFn<DataChangeRecord, KV<SortKey, DataChangeRecord>> {
        private static final long serialVersionUID = 1270485392415293532L;

        private KeyBySortKeyFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element DataChangeRecord dataChangeRecord, DoFn.OutputReceiver<KV<SortKey, DataChangeRecord>> outputReceiver) {
            outputReceiver.output(KV.of(new SortKey(dataChangeRecord.getCommitTimestamp(), dataChangeRecord.getServerTransactionId()), dataChangeRecord));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT$SortKey.class */
    public static class SortKey implements Serializable, Comparable<SortKey> {
        private static final long serialVersionUID = 2105939115467195036L;
        private Timestamp commitTimestamp;
        private String transactionId;

        public SortKey() {
        }

        public SortKey(Timestamp timestamp, String str) {
            this.commitTimestamp = timestamp;
            this.transactionId = str;
        }

        public static long getSerialVersionUID() {
            return serialVersionUID;
        }

        public Timestamp getCommitTimestamp() {
            return this.commitTimestamp;
        }

        public void setCommitTimestamp(Timestamp timestamp) {
            this.commitTimestamp = timestamp;
        }

        public String getTransactionId() {
            return this.transactionId;
        }

        public void setTransactionId(String str) {
            this.transactionId = str;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            SortKey sortKey = (SortKey) obj;
            return Objects.equals(this.commitTimestamp, sortKey.commitTimestamp) && Objects.equals(this.transactionId, sortKey.transactionId);
        }

        public int hashCode() {
            return Objects.hash(this.commitTimestamp, this.transactionId);
        }

        @Override // java.lang.Comparable
        public int compareTo(SortKey sortKey) {
            return Comparator.comparingDouble(sortKey2 -> {
                return sortKey2.getCommitTimestamp().getSeconds() + (sortKey2.getCommitTimestamp().getNanos() / 1.0E9d);
            }).thenComparing(sortKey3 -> {
                return sortKey3.getTransactionId();
            }).compare(this, sortKey);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamOrderedByTimestampAndTransactionIdIT$ToStringFn.class */
    private static class ToStringFn extends DoFn<Iterable<KV<SortKey, DataChangeRecord>>, String> {
        private static final long serialVersionUID = 2307936669684679038L;

        private ToStringFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element Iterable<KV<SortKey, DataChangeRecord>> iterable, DoFn.OutputReceiver<String> outputReceiver) {
            StringBuilder sb = new StringBuilder();
            ((List) StreamSupport.stream(iterable.spliterator(), false).sorted((kv, kv2) -> {
                return ((SortKey) kv.getKey()).compareTo((SortKey) kv2.getKey());
            }).collect(Collectors.toList())).forEach(kv3 -> {
                sb.append(SpannerChangeStreamOrderedByTimestampAndTransactionIdIT.getRecordString((DataChangeRecord) kv3.getValue()));
            });
            outputReceiver.output(sb.toString());
        }
    }

    @BeforeClass
    public static void setup() throws InterruptedException, ExecutionException, TimeoutException {
        projectId = ENV.getProjectId();
        instanceId = ENV.getInstanceId();
        databaseId = ENV.getDatabaseId();
        tableName = ENV.createSingersTable();
        changeStreamName = ENV.createChangeStreamFor(tableName);
        databaseClient = ENV.getDatabaseClient();
    }

    @Test
    public void testTransactionBoundaries() {
        SpannerConfig withDatabaseId = SpannerConfig.create().withProjectId(projectId).withInstanceId(instanceId).withDatabaseId(databaseId);
        ArrayList arrayList = new ArrayList();
        arrayList.add(insertRecordMutation(0L, "FirstName0", "LastName0"));
        Timestamp write = databaseClient.write(arrayList);
        writeTransactionsToDatabase();
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            LOG.error(e.toString(), e);
        }
        writeTransactionsToDatabase();
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e2) {
            LOG.error(e2.toString(), e2);
        }
        Timestamp writeTransactionsToDatabase = writeTransactionsToDatabase();
        PAssert.that(this.pipeline.apply(SpannerIO.readChangeStream().withSpannerConfig(withDatabaseId).withChangeStreamName(changeStreamName).withMetadataDatabase(databaseId).withInclusiveStartAt(write).withInclusiveEndAt(writeTransactionsToDatabase)).apply(ParDo.of(new KeyBySortKeyFn())).apply(ParDo.of(new CreateArtificialKeyFn())).apply(ParDo.of(new BufferRecordsUntilOutputTimestamp(writeTransactionsToDatabase, 2L))).apply(ParDo.of(new ToStringFn()))).containsInAnyOrder(new String[]{"{\"SingerId\":\"0\"},INSERT\n{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n{\"SingerId\":\"1\"},DELETE\n{\"SingerId\":\"3\"},INSERT\n{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n{\"SingerId\":\"0\"},DELETE\n", "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n{\"SingerId\":\"1\"},DELETE\n{\"SingerId\":\"3\"},INSERT\n{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n", "{\"SingerId\":\"1\"}{\"SingerId\":\"2\"},INSERT\n{\"SingerId\":\"1\"},DELETE\n{\"SingerId\":\"3\"},INSERT\n{\"SingerId\":\"2\"}{\"SingerId\":\"3\"},DELETE\n"});
        this.pipeline.runWithAdditionalOptionArgs(Collections.singletonList("--streaming")).waitUntilFinish();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String getRecordString(DataChangeRecord dataChangeRecord) {
        StringBuilder sb = new StringBuilder();
        String str = "";
        Iterator it = dataChangeRecord.getMods().iterator();
        while (it.hasNext()) {
            str = str + ((Mod) it.next()).getKeysJson();
        }
        sb.append(String.join(",", str, dataChangeRecord.getModType().toString()));
        sb.append("\n");
        return sb.toString();
    }

    private Timestamp writeTransactionsToDatabase() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(insertRecordMutation(1L, "FirstName1", "LastName2"));
        arrayList.add(insertRecordMutation(2L, "FirstName2", "LastName2"));
        LOG.debug("The first transaction committed with timestamp: " + databaseClient.write(arrayList).toString());
        arrayList.clear();
        arrayList.add(insertRecordMutation(3L, "FirstName3", "LastName3"));
        arrayList.add(deleteRecordMutation(1L));
        LOG.debug("The second transaction committed with timestamp: " + databaseClient.write(arrayList).toString());
        arrayList.clear();
        arrayList.add(deleteRecordMutation(2L));
        arrayList.add(deleteRecordMutation(3L));
        LOG.debug("The third transaction committed with timestamp: " + databaseClient.write(arrayList).toString());
        arrayList.clear();
        arrayList.add(deleteRecordMutation(0L));
        Timestamp write = databaseClient.write(arrayList);
        LOG.debug("The fourth transaction committed with timestamp: " + write.toString());
        return write;
    }

    private static Mutation insertRecordMutation(long j, String str, String str2) {
        return ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertBuilder(tableName).set("SingerId").to(j)).set("FirstName").to(str)).set("LastName").to(str2)).build();
    }

    private static Mutation deleteRecordMutation(long j) {
        return Mutation.delete(tableName, KeySet.newBuilder().addKey(Key.of(new Object[]{Long.valueOf(j)})).build());
    }
}
