/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.spanner;

import com.google.cloud.ServiceFactory;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.CommitResponse;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.KeyRange;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.ReadOnlyTransaction;
import com.google.cloud.spanner.ResultSets;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.Type;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.gcp.spanner.FakeServiceFactory;
import org.apache.beam.sdk.io.gcp.spanner.MutationGroup;
import org.apache.beam.sdk.io.gcp.spanner.MutationSizeEstimator;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerSchema;
import org.apache.beam.sdk.io.gcp.spanner.SpannerWriteResult;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.verification.VerificationMode;

@RunWith(value=JUnit4.class)
public class SpannerIOWriteTest
implements Serializable {
    private static final long CELLS_PER_KEY = 7L;
    private static final String TABLE_NAME = "test-table";
    private static final SpannerConfig SPANNER_CONFIG = SpannerConfig.create().withDatabaseId("test-database").withInstanceId("test-instance").withProjectId("test-project");
    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();
    @Rule
    public transient ExpectedException thrown = ExpectedException.none();
    @Captor
    public transient ArgumentCaptor<Iterable<Mutation>> mutationBatchesCaptor;
    @Captor
    public transient ArgumentCaptor<Options.ReadQueryUpdateTransactionOption> optionsCaptor;
    @Captor
    public transient ArgumentCaptor<Iterable<MutationGroup>> mutationGroupListCaptor;
    @Captor
    public transient ArgumentCaptor<MutationGroup> mutationGroupCaptor;
    private FakeServiceFactory serviceFactory;

    @Before
    public void setUp() throws Exception {
        MockitoAnnotations.initMocks((Object)this);
        this.serviceFactory = new FakeServiceFactory();
        ReadOnlyTransaction tx = (ReadOnlyTransaction)Mockito.mock(ReadOnlyTransaction.class);
        Mockito.when((Object)this.serviceFactory.mockDatabaseClient().readOnlyTransaction()).thenReturn((Object)tx);
        Mockito.when((Object)this.serviceFactory.mockDatabaseClient().writeAtLeastOnceWithOptions((Iterable)this.mutationBatchesCaptor.capture(), new Options.TransactionOption[]{(Options.TransactionOption)this.optionsCaptor.capture()})).thenReturn(null);
        SpannerIOWriteTest.preparePkMetadata(tx, Arrays.asList(SpannerIOWriteTest.pkMetadata("tEsT-TaBlE", "key", "ASC")));
        SpannerIOWriteTest.prepareColumnMetadata(tx, Arrays.asList(SpannerIOWriteTest.columnMetadata("tEsT-TaBlE", "key", "INT64", 7L)));
        SpannerIOWriteTest.preparePgColumnMetadata(tx, Arrays.asList(SpannerIOWriteTest.columnMetadata("tEsT-TaBlE", "key", "bigint", 7L)));
        MetricsContainerImpl container = new MetricsContainerImpl(null);
        MetricsEnvironment.setProcessWideContainer((MetricsContainer)container);
        MetricsEnvironment.setCurrentContainer((MetricsContainer)container);
    }

    private SpannerSchema getSchema() {
        return SpannerSchema.builder().addColumn("tEsT-TaBlE", "key", "INT64", 7L).addKeyPart("tEsT-TaBlE", "key", false).build();
    }

    static Struct columnMetadata(String tableName, String columnName, String type, long cellsMutated) {
        return ((Struct.Builder)((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("table_name").to(tableName)).set("column_name").to(columnName)).set("spanner_type").to(type)).set("cells_mutated").to(cellsMutated)).build();
    }

    static Struct pkMetadata(String tableName, String columnName, String ordering) {
        return ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("table_name").to(tableName)).set("column_name").to(columnName)).set("column_ordering").to(ordering)).build();
    }

    static void prepareColumnMetadata(ReadOnlyTransaction tx, List<Struct> rows) {
        Type type = Type.struct((Type.StructField[])new Type.StructField[]{Type.StructField.of((String)"table_name", (Type)Type.string()), Type.StructField.of((String)"column_name", (Type)Type.string()), Type.StructField.of((String)"spanner_type", (Type)Type.string()), Type.StructField.of((String)"cells_mutated", (Type)Type.int64())});
        Mockito.when((Object)tx.executeQuery((Statement)Mockito.argThat((ArgumentMatcher)new ArgumentMatcher<Statement>(){

            public boolean matches(Statement argument) {
                if (!(argument instanceof Statement)) {
                    return false;
                }
                Statement st = argument;
                return st.getSql().contains("information_schema.columns");
            }
        }), new Options.QueryOption[0])).thenReturn((Object)ResultSets.forRows((Type)type, rows));
    }

    static void preparePgColumnMetadata(ReadOnlyTransaction tx, List<Struct> rows) {
        Type type = Type.struct((Type.StructField[])new Type.StructField[]{Type.StructField.of((String)"table_name", (Type)Type.string()), Type.StructField.of((String)"column_name", (Type)Type.string()), Type.StructField.of((String)"spanner_type", (Type)Type.string()), Type.StructField.of((String)"cells_mutated", (Type)Type.int64())});
        Mockito.when((Object)tx.executeQuery((Statement)Mockito.argThat((ArgumentMatcher)new ArgumentMatcher<Statement>(){

            public boolean matches(Statement argument) {
                if (!(argument instanceof Statement)) {
                    return false;
                }
                Statement st = argument;
                return st.getSql().contains("information_schema.columns") && st.getSql().contains("('information_schema', 'spanner_sys', 'pg_catalog')");
            }
        }), new Options.QueryOption[0])).thenReturn((Object)ResultSets.forRows((Type)type, rows));
    }

    static void preparePkMetadata(ReadOnlyTransaction tx, List<Struct> rows) {
        Type type = Type.struct((Type.StructField[])new Type.StructField[]{Type.StructField.of((String)"table_name", (Type)Type.string()), Type.StructField.of((String)"column_name", (Type)Type.string()), Type.StructField.of((String)"column_ordering", (Type)Type.string())});
        Mockito.when((Object)tx.executeQuery((Statement)Mockito.argThat((ArgumentMatcher)new ArgumentMatcher<Statement>(){

            public boolean matches(Statement argument) {
                if (!(argument instanceof Statement)) {
                    return false;
                }
                Statement st = argument;
                return st.getSql().contains("information_schema.index_columns");
            }
        }), new Options.QueryOption[0])).thenReturn((Object)ResultSets.forRows((Type)type, rows));
    }

    @Test
    public void emptyTransform() throws Exception {
        SpannerIO.Write write = SpannerIO.write();
        this.thrown.expect(NullPointerException.class);
        this.thrown.expectMessage("requires instance id to be set with");
        write.expand(null);
    }

    @Test
    public void emptyInstanceId() throws Exception {
        SpannerIO.Write write = SpannerIO.write().withDatabaseId("123");
        this.thrown.expect(NullPointerException.class);
        this.thrown.expectMessage("requires instance id to be set with");
        write.expand(null);
    }

    @Test
    public void emptyDatabaseId() throws Exception {
        SpannerIO.Write write = SpannerIO.write().withInstanceId("123");
        this.thrown.expect(NullPointerException.class);
        this.thrown.expectMessage("requires database id to be set with");
        write.expand(null);
    }

    @Test
    public void singleMutationPipeline() throws Exception {
        Mutation mutation = SpannerIOWriteTest.buildUpsertMutation(2L);
        PCollection mutations = (PCollection)this.pipeline.apply((PTransform)Create.of((Object)mutation, (Object[])new Mutation[0]));
        mutations.apply((PTransform)SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory((ServiceFactory)this.serviceFactory));
        this.pipeline.run();
        this.verifyBatches(SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(2L)));
    }

    @Test
    public void singlePgMutationPipeline() throws Exception {
        Mutation mutation = SpannerIOWriteTest.buildUpsertMutation(2L);
        PCollection mutations = (PCollection)this.pipeline.apply((PTransform)Create.of((Object)mutation, (Object[])new Mutation[0]));
        PCollectionView pgDialectView = (PCollectionView)((PCollection)this.pipeline.apply("Create PG dialect", (PTransform)Create.of((Object)Dialect.POSTGRESQL, (Object[])new Dialect[0]))).apply((PTransform)View.asSingleton());
        mutations.apply((PTransform)SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory((ServiceFactory)this.serviceFactory).withDialectView(pgDialectView));
        this.pipeline.run();
        this.verifyBatches(SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(2L)));
    }

    @Test
    public void singleMutationPipelineNoProjectId() throws Exception {
        Mutation mutation = SpannerIOWriteTest.buildUpsertMutation(2L);
        PCollection mutations = (PCollection)this.pipeline.apply((PTransform)Create.of((Object)mutation, (Object[])new Mutation[0]));
        SpannerConfig config = SpannerConfig.create().withInstanceId("test-instance").withDatabaseId("test-database");
        mutations.apply((PTransform)SpannerIO.write().withSpannerConfig(config).withServiceFactory((ServiceFactory)this.serviceFactory));
        this.pipeline.run();
        ((DatabaseClient)Mockito.verify((Object)this.serviceFactory.mockDatabaseClient(), (VerificationMode)Mockito.times((int)1))).writeAtLeastOnceWithOptions(SpannerIOWriteTest.mutationsInNoOrder(SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(2L))), new Options.TransactionOption[]{(Options.TransactionOption)Matchers.any(Options.ReadQueryUpdateTransactionOption.class)});
        this.verifyTableWriteRequestMetricWasSet(config, TABLE_NAME, "ok", 1L);
    }

    @Test
    public void singleMutationPipelineNullProjectId() throws Exception {
        Mutation mutation = SpannerIOWriteTest.buildUpsertMutation(2L);
        PCollection mutations = (PCollection)this.pipeline.apply((PTransform)Create.of((Object)mutation, (Object[])new Mutation[0]));
        SpannerConfig config = SpannerConfig.create().withProjectId((String)null).withInstanceId("test-instance").withDatabaseId("test-database");
        mutations.apply((PTransform)SpannerIO.write().withSpannerConfig(config).withServiceFactory((ServiceFactory)this.serviceFactory));
        this.pipeline.run();
        ((DatabaseClient)Mockito.verify((Object)this.serviceFactory.mockDatabaseClient(), (VerificationMode)Mockito.times((int)1))).writeAtLeastOnceWithOptions(SpannerIOWriteTest.mutationsInNoOrder(SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(2L))), new Options.TransactionOption[]{(Options.TransactionOption)Matchers.any(Options.ReadQueryUpdateTransactionOption.class)});
        this.verifyTableWriteRequestMetricWasSet(config, TABLE_NAME, "ok", 1L);
    }

    @Test
    public void singleMutationGroupPipeline() throws Exception {
        PCollection mutations = (PCollection)this.pipeline.apply((PTransform)Create.of((Object)SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), SpannerIOWriteTest.buildUpsertMutation(2L), SpannerIOWriteTest.buildUpsertMutation(3L)), (Object[])new MutationGroup[0]));
        mutations.apply((PTransform)SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory((ServiceFactory)this.serviceFactory).grouped());
        this.pipeline.run();
        this.verifyBatches(SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(1L), SpannerIOWriteTest.buildUpsertMutation(2L), SpannerIOWriteTest.buildUpsertMutation(3L)));
    }

    @Test
    public void singlePgMutationGroupPipeline() throws Exception {
        PCollection mutations = (PCollection)this.pipeline.apply((PTransform)Create.of((Object)SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), SpannerIOWriteTest.buildUpsertMutation(2L), SpannerIOWriteTest.buildUpsertMutation(3L)), (Object[])new MutationGroup[0]));
        PCollectionView pgDialectView = (PCollectionView)((PCollection)this.pipeline.apply("Create PG dialect", (PTransform)Create.of((Object)Dialect.POSTGRESQL, (Object[])new Dialect[0]))).apply((PTransform)View.asSingleton());
        mutations.apply((PTransform)SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory((ServiceFactory)this.serviceFactory).withDialectView(pgDialectView).grouped());
        this.pipeline.run();
        this.verifyBatches(SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(1L), SpannerIOWriteTest.buildUpsertMutation(2L), SpannerIOWriteTest.buildUpsertMutation(3L)));
    }

    @Test
    public void metricsForDifferentTables() throws Exception {
        Mutation mutation = SpannerIOWriteTest.buildUpsertMutation(2L);
        Mutation mutation2 = ((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"other-table").set("key").to("3L")).build();
        PCollection mutations = (PCollection)this.pipeline.apply((PTransform)Create.of((Object)mutation, (Object[])new Mutation[]{mutation2}));
        mutations.apply((PTransform)SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory((ServiceFactory)this.serviceFactory));
        this.pipeline.run();
        this.verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "ok", 1L);
        this.verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, "other-table", "ok", 1L);
    }

    private void verifyBatches(Iterable<Mutation> ... batches) {
        for (Iterable<Mutation> b : batches) {
            ((DatabaseClient)Mockito.verify((Object)this.serviceFactory.mockDatabaseClient(), (VerificationMode)Mockito.times((int)1))).writeAtLeastOnceWithOptions(SpannerIOWriteTest.mutationsInNoOrder(b), new Options.TransactionOption[]{(Options.TransactionOption)Matchers.any(Options.ReadQueryUpdateTransactionOption.class)});
        }
        this.verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "ok", batches.length);
    }

    @Test
    public void noBatching() throws Exception {
        FakeServiceFactory fakeServiceFactory = new FakeServiceFactory();
        ReadOnlyTransaction tx = (ReadOnlyTransaction)Mockito.mock(ReadOnlyTransaction.class);
        Mockito.when((Object)fakeServiceFactory.mockDatabaseClient().readOnlyTransaction()).thenReturn((Object)tx);
        Mockito.when((Object)fakeServiceFactory.mockDatabaseClient().writeAtLeastOnceWithOptions((Iterable)this.mutationBatchesCaptor.capture(), new Options.TransactionOption[]{(Options.TransactionOption)this.optionsCaptor.capture()})).thenReturn(null);
        PCollection mutations = (PCollection)this.pipeline.apply((PTransform)Create.of((Object)SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), new Mutation[0]), (Object[])new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(2L), new Mutation[0])}));
        mutations.apply((PTransform)SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory((ServiceFactory)fakeServiceFactory).withBatchSizeBytes(1L).grouped());
        this.pipeline.run();
        ((DatabaseClient)Mockito.verify((Object)fakeServiceFactory.mockDatabaseClient(), (VerificationMode)Mockito.times((int)1))).writeAtLeastOnceWithOptions(SpannerIOWriteTest.mutationsInNoOrder(SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(1L))), new Options.TransactionOption[]{(Options.TransactionOption)Matchers.any(Options.ReadQueryUpdateTransactionOption.class)});
        ((DatabaseClient)Mockito.verify((Object)fakeServiceFactory.mockDatabaseClient(), (VerificationMode)Mockito.times((int)1))).writeAtLeastOnceWithOptions(SpannerIOWriteTest.mutationsInNoOrder(SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(2L))), new Options.TransactionOption[]{(Options.TransactionOption)Matchers.any(Options.ReadQueryUpdateTransactionOption.class)});
        ((ReadOnlyTransaction)Mockito.verify((Object)tx, (VerificationMode)Mockito.never())).executeQuery((Statement)Matchers.any(), new Options.QueryOption[0]);
        this.verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "ok", 2L);
    }

    @Test
    public void streamingWrites() throws Exception {
        TestStream testStream = TestStream.create((Coder)SerializableCoder.of(Mutation.class)).addElements((Object)SpannerIOWriteTest.buildUpsertMutation(1L), (Object[])new Mutation[]{SpannerIOWriteTest.buildUpsertMutation(2L)}).advanceProcessingTime(Duration.standardMinutes((long)1L)).addElements((Object)SpannerIOWriteTest.buildUpsertMutation(3L), (Object[])new Mutation[]{SpannerIOWriteTest.buildUpsertMutation(4L)}).advanceProcessingTime(Duration.standardMinutes((long)1L)).addElements((Object)SpannerIOWriteTest.buildUpsertMutation(5L), (Object[])new Mutation[]{SpannerIOWriteTest.buildUpsertMutation(6L)}).advanceWatermarkToInfinity();
        ((PCollection)this.pipeline.apply((PTransform)testStream)).apply((PTransform)SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory((ServiceFactory)this.serviceFactory));
        this.pipeline.run();
        this.verifyBatches(SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(1L), SpannerIOWriteTest.buildUpsertMutation(2L)), SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(3L), SpannerIOWriteTest.buildUpsertMutation(4L)), SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(5L), SpannerIOWriteTest.buildUpsertMutation(6L)));
    }

    @Test
    public void streamingWritesWithPriority() throws Exception {
        TestStream testStream = TestStream.create((Coder)SerializableCoder.of(Mutation.class)).addElements((Object)SpannerIOWriteTest.buildUpsertMutation(1L), (Object[])new Mutation[]{SpannerIOWriteTest.buildUpsertMutation(2L)}).advanceProcessingTime(Duration.standardMinutes((long)1L)).addElements((Object)SpannerIOWriteTest.buildUpsertMutation(3L), (Object[])new Mutation[]{SpannerIOWriteTest.buildUpsertMutation(4L)}).advanceProcessingTime(Duration.standardMinutes((long)1L)).addElements((Object)SpannerIOWriteTest.buildUpsertMutation(5L), (Object[])new Mutation[]{SpannerIOWriteTest.buildUpsertMutation(6L)}).advanceWatermarkToInfinity();
        SpannerIO.Write write = SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory((ServiceFactory)this.serviceFactory).withHighPriority();
        ((PCollection)this.pipeline.apply((PTransform)testStream)).apply((PTransform)write);
        this.pipeline.run();
        Assert.assertEquals((Object)Options.RpcPriority.HIGH, (Object)write.getSpannerConfig().getRpcPriority().get());
        this.verifyBatches(SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(1L), SpannerIOWriteTest.buildUpsertMutation(2L)), SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(3L), SpannerIOWriteTest.buildUpsertMutation(4L)), SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(5L), SpannerIOWriteTest.buildUpsertMutation(6L)));
    }

    @Test
    public void streamingWritesWithGrouping() throws Exception {
        TestStream testStream = TestStream.create((Coder)SerializableCoder.of(Mutation.class)).addElements((Object)SpannerIOWriteTest.buildUpsertMutation(1L), (Object[])new Mutation[]{SpannerIOWriteTest.buildUpsertMutation(5L), SpannerIOWriteTest.buildUpsertMutation(2L), SpannerIOWriteTest.buildUpsertMutation(4L), SpannerIOWriteTest.buildUpsertMutation(3L), SpannerIOWriteTest.buildUpsertMutation(6L)}).advanceWatermarkToInfinity();
        ((PCollection)this.pipeline.apply((PTransform)testStream)).apply((PTransform)SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory((ServiceFactory)this.serviceFactory).withGroupingFactor(40).withMaxNumRows(2L));
        this.pipeline.run();
        this.verifyBatches(SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(1L), SpannerIOWriteTest.buildUpsertMutation(2L)), SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(3L), SpannerIOWriteTest.buildUpsertMutation(4L)), SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(5L), SpannerIOWriteTest.buildUpsertMutation(6L)));
    }

    @Test
    public void streamingWritesWithGroupingWithPriority() throws Exception {
        TestStream testStream = TestStream.create((Coder)SerializableCoder.of(Mutation.class)).addElements((Object)SpannerIOWriteTest.buildUpsertMutation(1L), (Object[])new Mutation[]{SpannerIOWriteTest.buildUpsertMutation(5L), SpannerIOWriteTest.buildUpsertMutation(2L), SpannerIOWriteTest.buildUpsertMutation(4L), SpannerIOWriteTest.buildUpsertMutation(3L), SpannerIOWriteTest.buildUpsertMutation(6L)}).advanceWatermarkToInfinity();
        SpannerIO.Write write = SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory((ServiceFactory)this.serviceFactory).withGroupingFactor(40).withMaxNumRows(2L).withLowPriority();
        ((PCollection)this.pipeline.apply((PTransform)testStream)).apply((PTransform)write);
        this.pipeline.run();
        Assert.assertEquals((Object)Options.RpcPriority.LOW, (Object)write.getSpannerConfig().getRpcPriority().get());
        this.verifyBatches(SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(1L), SpannerIOWriteTest.buildUpsertMutation(2L)), SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(3L), SpannerIOWriteTest.buildUpsertMutation(4L)), SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(5L), SpannerIOWriteTest.buildUpsertMutation(6L)));
    }

    @Test
    public void streamingWritesNoGrouping() throws Exception {
        TestStream testStream = TestStream.create((Coder)SerializableCoder.of(Mutation.class)).addElements((Object)SpannerIOWriteTest.buildUpsertMutation(1L), (Object[])new Mutation[]{SpannerIOWriteTest.buildUpsertMutation(5L), SpannerIOWriteTest.buildUpsertMutation(2L), SpannerIOWriteTest.buildUpsertMutation(4L), SpannerIOWriteTest.buildUpsertMutation(3L), SpannerIOWriteTest.buildUpsertMutation(6L)}).advanceWatermarkToInfinity();
        ((PCollection)this.pipeline.apply((PTransform)testStream)).apply((PTransform)SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory((ServiceFactory)this.serviceFactory).withMaxNumRows(2L));
        this.pipeline.run();
        this.verifyBatches(SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(1L), SpannerIOWriteTest.buildUpsertMutation(5L)), SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(2L), SpannerIOWriteTest.buildUpsertMutation(4L)), SpannerIOWriteTest.buildMutationBatch(SpannerIOWriteTest.buildUpsertMutation(3L), SpannerIOWriteTest.buildUpsertMutation(6L)));
    }

    @Test
    public void reportFailures() throws Exception {
        MutationGroup[] mutationGroups = new MutationGroup[10];
        for (int i = 0; i < mutationGroups.length; ++i) {
            mutationGroups[i] = SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(Long.valueOf(i)), new Mutation[0]);
        }
        List<MutationGroup> mutationGroupList = Arrays.asList(mutationGroups);
        Mockito.when((Object)this.serviceFactory.mockDatabaseClient().writeAtLeastOnceWithOptions((Iterable)Matchers.any(), new Options.TransactionOption[]{(Options.TransactionOption)Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenAnswer(invocationOnMock -> {
            Preconditions.checkNotNull((Object)invocationOnMock.getArguments()[0]);
            throw SpannerExceptionFactory.newSpannerException((ErrorCode)ErrorCode.ALREADY_EXISTS, (String)"oops");
        });
        SpannerWriteResult result = (SpannerWriteResult)((PCollection)this.pipeline.apply((PTransform)Create.of(mutationGroupList))).apply((PTransform)SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory((ServiceFactory)this.serviceFactory).withBatchSizeBytes(0L).withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES).grouped());
        PAssert.that((PCollection)result.getFailedMutations()).satisfies((SerializableFunction & Serializable)m -> {
            Assert.assertEquals((long)mutationGroups.length, (long)Iterables.size((Iterable)m));
            return null;
        });
        PAssert.that((PCollection)result.getFailedMutations()).containsInAnyOrder(mutationGroupList);
        this.pipeline.run().waitUntilFinish();
        ((DatabaseClient)Mockito.verify((Object)this.serviceFactory.mockDatabaseClient(), (VerificationMode)Mockito.times((int)20))).writeAtLeastOnceWithOptions((Iterable)Matchers.any(), new Options.TransactionOption[]{(Options.TransactionOption)Matchers.any(Options.ReadQueryUpdateTransactionOption.class)});
        this.verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "ok", 0L);
        this.verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "already_exists", 20L);
    }

    @Test
    public void deadlineExceededRetries() throws InterruptedException {
        List<Mutation> mutationList = Arrays.asList(SpannerIOWriteTest.buildUpsertMutation(1L));
        SpannerIO.WriteToSpannerFn.sleeper = (Sleeper)Mockito.mock(Sleeper.class);
        Mockito.when((Object)this.serviceFactory.mockDatabaseClient().writeAtLeastOnceWithOptions((Iterable)Matchers.any(), new Options.TransactionOption[]{(Options.TransactionOption)Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException((ErrorCode)ErrorCode.DEADLINE_EXCEEDED, (String)"simulated Timeout 1")}).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException((ErrorCode)ErrorCode.DEADLINE_EXCEEDED, (String)"simulated Timeout 2")}).thenReturn((Object)new CommitResponse(Timestamp.now()));
        SpannerWriteResult result = (SpannerWriteResult)((PCollection)this.pipeline.apply((PTransform)Create.of(mutationList))).apply((PTransform)SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory((ServiceFactory)this.serviceFactory).withBatchSizeBytes(0L).withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES));
        PAssert.that((PCollection)result.getFailedMutations()).satisfies((SerializableFunction & Serializable)m -> {
            Assert.assertEquals((long)0L, (long)Iterables.size((Iterable)m));
            return null;
        });
        this.pipeline.run().waitUntilFinish();
        ((Sleeper)Mockito.verify((Object)SpannerIO.WriteToSpannerFn.sleeper, (VerificationMode)Mockito.times((int)2))).sleep(ArgumentMatchers.anyLong());
        ((DatabaseClient)Mockito.verify((Object)this.serviceFactory.mockDatabaseClient(), (VerificationMode)Mockito.times((int)3))).writeAtLeastOnceWithOptions((Iterable)Matchers.any(), new Options.TransactionOption[]{(Options.TransactionOption)Matchers.any(Options.ReadQueryUpdateTransactionOption.class)});
        this.verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "ok", 1L);
        this.verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "deadline_exceeded", 2L);
    }

    @Test
    public void deadlineExceededFailsAfterRetries() throws InterruptedException {
        List<Mutation> mutationList = Arrays.asList(SpannerIOWriteTest.buildUpsertMutation(1L));
        SpannerIO.WriteToSpannerFn.sleeper = (Sleeper)Mockito.mock(Sleeper.class);
        Mockito.when((Object)this.serviceFactory.mockDatabaseClient().writeAtLeastOnceWithOptions((Iterable)Matchers.any(), new Options.TransactionOption[]{(Options.TransactionOption)Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException((ErrorCode)ErrorCode.DEADLINE_EXCEEDED, (String)"simulated Timeout")});
        SpannerWriteResult result = (SpannerWriteResult)((PCollection)this.pipeline.apply((PTransform)Create.of(mutationList))).apply((PTransform)SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory((ServiceFactory)this.serviceFactory).withBatchSizeBytes(0L).withMaxCumulativeBackoff(Duration.standardHours((long)2L)).withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES));
        PAssert.that((PCollection)result.getFailedMutations()).satisfies((SerializableFunction & Serializable)m -> {
            Assert.assertEquals((long)1L, (long)Iterables.size((Iterable)m));
            return null;
        });
        this.pipeline.run().waitUntilFinish();
        long totalSleep = Mockito.mockingDetails((Object)SpannerIO.WriteToSpannerFn.sleeper).getInvocations().stream().mapToLong(i -> (Long)i.getArgument(0)).reduce(0L, Long::sum);
        Assert.assertTrue((String)String.format("Should be least 7200s of sleep, got %d", totalSleep), (totalSleep >= Duration.standardHours((long)2L).getMillis() ? 1 : 0) != 0);
        int numSleeps = Mockito.mockingDetails((Object)SpannerIO.WriteToSpannerFn.sleeper).getInvocations().size();
        ((DatabaseClient)Mockito.verify((Object)this.serviceFactory.mockDatabaseClient(), (VerificationMode)Mockito.times((int)(numSleeps + 2)))).writeAtLeastOnceWithOptions((Iterable)Matchers.any(), new Options.TransactionOption[]{(Options.TransactionOption)Matchers.any(Options.ReadQueryUpdateTransactionOption.class)});
        this.verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "ok", 0L);
        this.verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "deadline_exceeded", numSleeps + 2);
    }

    @Test
    public void retryOnSchemaChangeException() throws InterruptedException {
        List<Mutation> mutationList = Arrays.asList(SpannerIOWriteTest.buildUpsertMutation(1L));
        String errString = "Transaction aborted. Database schema probably changed during transaction, retry may succeed.";
        SpannerIO.WriteToSpannerFn.sleeper = (Sleeper)Mockito.mock(Sleeper.class);
        Mockito.when((Object)this.serviceFactory.mockDatabaseClient().writeAtLeastOnceWithOptions((Iterable)Matchers.any(), new Options.TransactionOption[]{(Options.TransactionOption)Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException((ErrorCode)ErrorCode.ABORTED, (String)errString)}).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException((ErrorCode)ErrorCode.ABORTED, (String)errString)}).thenReturn((Object)new CommitResponse(Timestamp.now()));
        SpannerWriteResult result = (SpannerWriteResult)((PCollection)this.pipeline.apply((PTransform)Create.of(mutationList))).apply((PTransform)SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory((ServiceFactory)this.serviceFactory).withBatchSizeBytes(0L).withFailureMode(SpannerIO.FailureMode.FAIL_FAST));
        PAssert.that((PCollection)result.getFailedMutations()).satisfies((SerializableFunction & Serializable)m -> {
            Assert.assertEquals((long)0L, (long)Iterables.size((Iterable)m));
            return null;
        });
        this.pipeline.run().waitUntilFinish();
        ((Sleeper)Mockito.verify((Object)SpannerIO.WriteToSpannerFn.sleeper, (VerificationMode)Mockito.times((int)0))).sleep(ArgumentMatchers.anyLong());
        ((DatabaseClient)Mockito.verify((Object)this.serviceFactory.mockDatabaseClient(), (VerificationMode)Mockito.times((int)3))).writeAtLeastOnceWithOptions((Iterable)Matchers.any(), new Options.TransactionOption[]{(Options.TransactionOption)Matchers.any(Options.ReadQueryUpdateTransactionOption.class)});
        this.verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "ok", 1L);
        this.verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "aborted", 2L);
    }

    @Test
    public void retryMaxOnSchemaChangeException() throws InterruptedException {
        List<Mutation> mutationList = Arrays.asList(SpannerIOWriteTest.buildUpsertMutation(1L));
        String errString = "Transaction aborted. Database schema probably changed during transaction, retry may succeed.";
        SpannerIO.WriteToSpannerFn.sleeper = (Sleeper)Mockito.mock(Sleeper.class);
        Mockito.when((Object)this.serviceFactory.mockDatabaseClient().writeAtLeastOnceWithOptions((Iterable)Matchers.any(), new Options.TransactionOption[]{(Options.TransactionOption)Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException((ErrorCode)ErrorCode.ABORTED, (String)errString)});
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectMessage(errString);
        SpannerWriteResult result = (SpannerWriteResult)((PCollection)this.pipeline.apply((PTransform)Create.of(mutationList))).apply((PTransform)SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory((ServiceFactory)this.serviceFactory).withBatchSizeBytes(0L).withFailureMode(SpannerIO.FailureMode.FAIL_FAST));
        PAssert.that((PCollection)result.getFailedMutations()).satisfies((SerializableFunction & Serializable)m -> {
            Assert.assertEquals((long)1L, (long)Iterables.size((Iterable)m));
            return null;
        });
        this.pipeline.run().waitUntilFinish();
        ((Sleeper)Mockito.verify((Object)SpannerIO.WriteToSpannerFn.sleeper, (VerificationMode)Mockito.times((int)0))).sleep(ArgumentMatchers.anyLong());
        ((DatabaseClient)Mockito.verify((Object)this.serviceFactory.mockDatabaseClient(), (VerificationMode)Mockito.times((int)5))).writeAtLeastOnceWithOptions((Iterable)Matchers.any(), new Options.TransactionOption[]{(Options.TransactionOption)Matchers.any(Options.ReadQueryUpdateTransactionOption.class)});
        this.verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "ok", 0L);
        this.verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "aborted", 5L);
    }

    @Test
    public void retryOnAbortedAndDeadlineExceeded() throws InterruptedException {
        List<Mutation> mutationList = Arrays.asList(SpannerIOWriteTest.buildUpsertMutation(1L));
        String errString = "Transaction aborted. Database schema probably changed during transaction, retry may succeed.";
        SpannerIO.WriteToSpannerFn.sleeper = (Sleeper)Mockito.mock(Sleeper.class);
        Mockito.when((Object)this.serviceFactory.mockDatabaseClient().writeAtLeastOnceWithOptions((Iterable)Matchers.any(), new Options.TransactionOption[]{(Options.TransactionOption)Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException((ErrorCode)ErrorCode.ABORTED, (String)errString)}).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException((ErrorCode)ErrorCode.ABORTED, (String)errString)}).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException((ErrorCode)ErrorCode.DEADLINE_EXCEEDED, (String)"simulated Timeout 1")}).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException((ErrorCode)ErrorCode.ABORTED, (String)errString)}).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException((ErrorCode)ErrorCode.ABORTED, (String)errString)}).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException((ErrorCode)ErrorCode.ABORTED, (String)errString)}).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException((ErrorCode)ErrorCode.DEADLINE_EXCEEDED, (String)"simulated Timeout 2")}).thenReturn((Object)new CommitResponse(Timestamp.now()));
        SpannerWriteResult result = (SpannerWriteResult)((PCollection)this.pipeline.apply((PTransform)Create.of(mutationList))).apply((PTransform)SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory((ServiceFactory)this.serviceFactory).withBatchSizeBytes(0L).withFailureMode(SpannerIO.FailureMode.FAIL_FAST));
        PAssert.that((PCollection)result.getFailedMutations()).satisfies((SerializableFunction & Serializable)m -> {
            Assert.assertEquals((long)0L, (long)Iterables.size((Iterable)m));
            return null;
        });
        this.pipeline.run().waitUntilFinish();
        ((Sleeper)Mockito.verify((Object)SpannerIO.WriteToSpannerFn.sleeper, (VerificationMode)Mockito.times((int)2))).sleep(ArgumentMatchers.anyLong());
        ((DatabaseClient)Mockito.verify((Object)this.serviceFactory.mockDatabaseClient(), (VerificationMode)Mockito.times((int)8))).writeAtLeastOnceWithOptions((Iterable)Matchers.any(), new Options.TransactionOption[]{(Options.TransactionOption)Matchers.any(Options.ReadQueryUpdateTransactionOption.class)});
        this.verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "ok", 1L);
        this.verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "aborted", 5L);
        this.verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "deadline_exceeded", 2L);
    }

    @Test
    public void displayDataWrite() throws Exception {
        SpannerIO.Write write = SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withBatchSizeBytes(123L).withMaxNumMutations(456L).withMaxNumRows(789L).withGroupingFactor(100);
        DisplayData data = DisplayData.from((HasDisplayData)write);
        MatcherAssert.assertThat((Object)data.items(), (Matcher)org.hamcrest.Matchers.hasSize((int)7));
        MatcherAssert.assertThat((Object)data, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"projectId", (String)"test-project"));
        MatcherAssert.assertThat((Object)data, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"instanceId", (String)"test-instance"));
        MatcherAssert.assertThat((Object)data, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"databaseId", (String)"test-database"));
        MatcherAssert.assertThat((Object)data, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"batchSizeBytes", (long)123L));
        MatcherAssert.assertThat((Object)data, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"maxNumMutations", (long)456L));
        MatcherAssert.assertThat((Object)data, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"maxNumRows", (long)789L));
        MatcherAssert.assertThat((Object)data, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"groupingFactor", (String)"100"));
        write = SpannerIO.write().withSpannerConfig(SPANNER_CONFIG);
        data = DisplayData.from((HasDisplayData)write);
        MatcherAssert.assertThat((Object)data.items(), (Matcher)org.hamcrest.Matchers.hasSize((int)7));
        MatcherAssert.assertThat((Object)data, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"groupingFactor", (String)"DEFAULT"));
    }

    @Test
    public void displayDataWriteGrouped() throws Exception {
        SpannerIO.WriteGrouped writeGrouped = SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withBatchSizeBytes(123L).withMaxNumMutations(456L).withMaxNumRows(789L).withGroupingFactor(100).grouped();
        DisplayData data = DisplayData.from((HasDisplayData)writeGrouped);
        MatcherAssert.assertThat((Object)data.items(), (Matcher)org.hamcrest.Matchers.hasSize((int)7));
        MatcherAssert.assertThat((Object)data, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"projectId", (String)"test-project"));
        MatcherAssert.assertThat((Object)data, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"instanceId", (String)"test-instance"));
        MatcherAssert.assertThat((Object)data, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"databaseId", (String)"test-database"));
        MatcherAssert.assertThat((Object)data, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"batchSizeBytes", (long)123L));
        MatcherAssert.assertThat((Object)data, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"maxNumMutations", (long)456L));
        MatcherAssert.assertThat((Object)data, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"maxNumRows", (long)789L));
        MatcherAssert.assertThat((Object)data, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"groupingFactor", (String)"100"));
        writeGrouped = SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).grouped();
        data = DisplayData.from((HasDisplayData)writeGrouped);
        MatcherAssert.assertThat((Object)data.items(), (Matcher)org.hamcrest.Matchers.hasSize((int)7));
        MatcherAssert.assertThat((Object)data, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"groupingFactor", (String)"DEFAULT"));
    }

    @Test
    public void testBatchableMutationFilterFn_cells() {
        Mutation all = Mutation.delete((String)TABLE_NAME, (KeySet)KeySet.all());
        Mutation prefix = Mutation.delete((String)TABLE_NAME, (KeySet)KeySet.prefixRange((Key)Key.of((Object[])new Object[]{1L})));
        Mutation range = Mutation.delete((String)TABLE_NAME, (KeySet)KeySet.range((KeyRange)KeyRange.openOpen((Key)Key.of((Object[])new Object[]{1L}), (Key)Key.newBuilder().build())));
        MutationGroup[] mutationGroups = new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(2L), SpannerIOWriteTest.buildUpsertMutation(3L)), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(2L), SpannerIOWriteTest.buildUpsertMutation(3L), SpannerIOWriteTest.buildUpsertMutation(4L), SpannerIOWriteTest.buildUpsertMutation(5L)), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildDeleteMutation(1L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildDeleteMutation(5L, 6L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(all, new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(prefix, new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(range, new Mutation[0])};
        SpannerIO.BatchableMutationFilterFn testFn = new SpannerIO.BatchableMutationFilterFn(null, null, 10000000L, 21L, 1000L);
        DoFn.ProcessContext mockProcessContext = (DoFn.ProcessContext)Mockito.mock(DoFn.ProcessContext.class);
        Mockito.when((Object)mockProcessContext.sideInput((PCollectionView)Matchers.any())).thenReturn((Object)this.getSchema());
        ((DoFn.ProcessContext)Mockito.doNothing().when((Object)mockProcessContext)).output((Object)((MutationGroup)this.mutationGroupCaptor.capture()));
        ((DoFn.ProcessContext)Mockito.doNothing().when((Object)mockProcessContext)).output((TupleTag)Matchers.any(), (Object)((Iterable)this.mutationGroupListCaptor.capture()));
        for (MutationGroup m : mutationGroups) {
            Mockito.when((Object)((MutationGroup)mockProcessContext.element())).thenReturn((Object)m);
            testFn.processElement(mockProcessContext);
        }
        MatcherAssert.assertThat((Object)this.mutationGroupCaptor.getAllValues(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(2L), SpannerIOWriteTest.buildUpsertMutation(3L)), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildDeleteMutation(1L), new Mutation[0])}));
        Iterable unbatchableMutations = Iterables.concat((Iterable)this.mutationGroupListCaptor.getAllValues());
        MatcherAssert.assertThat((Object)unbatchableMutations, (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(2L), SpannerIOWriteTest.buildUpsertMutation(3L), SpannerIOWriteTest.buildUpsertMutation(4L), SpannerIOWriteTest.buildUpsertMutation(5L)), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildDeleteMutation(5L, 6L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(all, new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(prefix, new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(range, new Mutation[0])}));
    }

    @Test
    public void testBatchableMutationFilterFn_size() {
        Mutation all = Mutation.delete((String)TABLE_NAME, (KeySet)KeySet.all());
        Mutation prefix = Mutation.delete((String)TABLE_NAME, (KeySet)KeySet.prefixRange((Key)Key.of((Object[])new Object[]{1L})));
        Mutation range = Mutation.delete((String)TABLE_NAME, (KeySet)KeySet.range((KeyRange)KeyRange.openOpen((Key)Key.of((Object[])new Object[]{1L}), (Key)Key.newBuilder().build())));
        MutationGroup[] mutationGroups = new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(2L), SpannerIOWriteTest.buildUpsertMutation(3L)), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), SpannerIOWriteTest.buildUpsertMutation(3L), SpannerIOWriteTest.buildUpsertMutation(4L), SpannerIOWriteTest.buildUpsertMutation(5L)), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildDeleteMutation(1L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildDeleteMutation(5L, 6L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(all, new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(prefix, new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(range, new Mutation[0])};
        long mutationSize = MutationSizeEstimator.sizeOf((Mutation)SpannerIOWriteTest.buildUpsertMutation(1L));
        SpannerIO.BatchableMutationFilterFn testFn = new SpannerIO.BatchableMutationFilterFn(null, null, mutationSize * 3L, 1000L, 1000L);
        DoFn.ProcessContext mockProcessContext = (DoFn.ProcessContext)Mockito.mock(DoFn.ProcessContext.class);
        Mockito.when((Object)mockProcessContext.sideInput((PCollectionView)Matchers.any())).thenReturn((Object)this.getSchema());
        ((DoFn.ProcessContext)Mockito.doNothing().when((Object)mockProcessContext)).output((Object)((MutationGroup)this.mutationGroupCaptor.capture()));
        ((DoFn.ProcessContext)Mockito.doNothing().when((Object)mockProcessContext)).output((TupleTag)Matchers.any(), (Object)((Iterable)this.mutationGroupListCaptor.capture()));
        for (MutationGroup m : mutationGroups) {
            Mockito.when((Object)((MutationGroup)mockProcessContext.element())).thenReturn((Object)m);
            testFn.processElement(mockProcessContext);
        }
        MatcherAssert.assertThat((Object)this.mutationGroupCaptor.getAllValues(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(2L), SpannerIOWriteTest.buildUpsertMutation(3L)), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildDeleteMutation(1L), new Mutation[0])}));
        Iterable unbatchableMutations = Iterables.concat((Iterable)this.mutationGroupListCaptor.getAllValues());
        MatcherAssert.assertThat((Object)unbatchableMutations, (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), SpannerIOWriteTest.buildUpsertMutation(3L), SpannerIOWriteTest.buildUpsertMutation(4L), SpannerIOWriteTest.buildUpsertMutation(5L)), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildDeleteMutation(5L, 6L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(all, new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(prefix, new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(range, new Mutation[0])}));
    }

    @Test
    public void testBatchableMutationFilterFn_rows() {
        Mutation all = Mutation.delete((String)TABLE_NAME, (KeySet)KeySet.all());
        Mutation prefix = Mutation.delete((String)TABLE_NAME, (KeySet)KeySet.prefixRange((Key)Key.of((Object[])new Object[]{1L})));
        Mutation range = Mutation.delete((String)TABLE_NAME, (KeySet)KeySet.range((KeyRange)KeyRange.openOpen((Key)Key.of((Object[])new Object[]{1L}), (Key)Key.newBuilder().build())));
        MutationGroup[] mutationGroups = new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(2L), SpannerIOWriteTest.buildUpsertMutation(3L)), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), SpannerIOWriteTest.buildUpsertMutation(3L), SpannerIOWriteTest.buildUpsertMutation(4L), SpannerIOWriteTest.buildUpsertMutation(5L)), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildDeleteMutation(1L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildDeleteMutation(5L, 6L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(all, new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(prefix, new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(range, new Mutation[0])};
        SpannerIO.BatchableMutationFilterFn testFn = new SpannerIO.BatchableMutationFilterFn(null, null, 1000L, 1000L, 3L);
        DoFn.ProcessContext mockProcessContext = (DoFn.ProcessContext)Mockito.mock(DoFn.ProcessContext.class);
        Mockito.when((Object)mockProcessContext.sideInput((PCollectionView)Matchers.any())).thenReturn((Object)this.getSchema());
        ((DoFn.ProcessContext)Mockito.doNothing().when((Object)mockProcessContext)).output((Object)((MutationGroup)this.mutationGroupCaptor.capture()));
        ((DoFn.ProcessContext)Mockito.doNothing().when((Object)mockProcessContext)).output((TupleTag)Matchers.any(), (Object)((Iterable)this.mutationGroupListCaptor.capture()));
        for (MutationGroup m : mutationGroups) {
            Mockito.when((Object)((MutationGroup)mockProcessContext.element())).thenReturn((Object)m);
            testFn.processElement(mockProcessContext);
        }
        MatcherAssert.assertThat((Object)this.mutationGroupCaptor.getAllValues(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(2L), SpannerIOWriteTest.buildUpsertMutation(3L)), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildDeleteMutation(1L), new Mutation[0])}));
        Iterable unbatchableMutations = Iterables.concat((Iterable)this.mutationGroupListCaptor.getAllValues());
        MatcherAssert.assertThat((Object)unbatchableMutations, (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), SpannerIOWriteTest.buildUpsertMutation(3L), SpannerIOWriteTest.buildUpsertMutation(4L), SpannerIOWriteTest.buildUpsertMutation(5L)), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildDeleteMutation(5L, 6L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(all, new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(prefix, new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(range, new Mutation[0])}));
    }

    @Test
    public void testBatchableMutationFilterFn_batchingDisabled() {
        Object[] mutationGroups = new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(2L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildDeleteMutation(1L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildDeleteMutation(5L, 6L), new Mutation[0])};
        SpannerIO.BatchableMutationFilterFn testFn = new SpannerIO.BatchableMutationFilterFn(null, null, 0L, 0L, 0L);
        DoFn.ProcessContext mockProcessContext = (DoFn.ProcessContext)Mockito.mock(DoFn.ProcessContext.class);
        Mockito.when((Object)mockProcessContext.sideInput((PCollectionView)Matchers.any())).thenReturn((Object)this.getSchema());
        ((DoFn.ProcessContext)Mockito.doNothing().when((Object)mockProcessContext)).output((Object)((MutationGroup)this.mutationGroupCaptor.capture()));
        ((DoFn.ProcessContext)Mockito.doNothing().when((Object)mockProcessContext)).output((TupleTag)Matchers.any(), (Object)((Iterable)this.mutationGroupListCaptor.capture()));
        for (MutationGroup mutationGroup : mutationGroups) {
            Mockito.when((Object)((MutationGroup)mockProcessContext.element())).thenReturn((Object)mutationGroup);
            testFn.processElement(mockProcessContext);
        }
        Assert.assertTrue((boolean)this.mutationGroupCaptor.getAllValues().isEmpty());
        Iterable unbatchableMutations = Iterables.concat((Iterable)this.mutationGroupListCaptor.getAllValues());
        MatcherAssert.assertThat((Object)unbatchableMutations, (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])mutationGroups));
    }

    @Test
    public void testGatherSortAndBatchFn() throws Exception {
        MutationGroup[] mutationGroups;
        SpannerIO.GatherSortCreateBatchesFn testFn = new SpannerIO.GatherSortCreateBatchesFn(10000000L, 100L, 5L, 100L, null);
        DoFn.ProcessContext mockProcessContext = (DoFn.ProcessContext)Mockito.mock(DoFn.ProcessContext.class);
        DoFn.FinishBundleContext mockFinishBundleContext = (DoFn.FinishBundleContext)Mockito.mock(DoFn.FinishBundleContext.class);
        Mockito.when((Object)mockProcessContext.sideInput((PCollectionView)Matchers.any())).thenReturn((Object)this.getSchema());
        ((DoFn.FinishBundleContext)Mockito.doNothing().when((Object)mockFinishBundleContext)).output((Object)((Iterable)this.mutationGroupListCaptor.capture()), (Instant)Matchers.any(), (BoundedWindow)Matchers.any());
        for (MutationGroup m : mutationGroups = new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(4L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(7L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(12L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(10L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(11L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(2L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildDeleteMutation(8L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(3L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(6L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(9L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(5L), new Mutation[0])}) {
            Mockito.when((Object)((MutationGroup)mockProcessContext.element())).thenReturn((Object)m);
            testFn.processElement(mockProcessContext, null);
        }
        testFn.finishBundle(mockFinishBundleContext);
        ((DoFn.ProcessContext)Mockito.verify((Object)mockProcessContext, (VerificationMode)Mockito.never())).output((Object)((Iterable)Matchers.any()));
        ((DoFn.FinishBundleContext)Mockito.verify((Object)mockFinishBundleContext, (VerificationMode)Mockito.times((int)3))).output((Object)((Iterable)Matchers.any()), (Instant)Matchers.any(), (BoundedWindow)Matchers.any());
        MatcherAssert.assertThat((Object)this.mutationGroupListCaptor.getAllValues(), (Matcher)org.hamcrest.Matchers.contains((Object[])new Iterable[]{Arrays.asList(SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(2L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(3L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(4L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(5L), new Mutation[0])), Arrays.asList(SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(6L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(7L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildDeleteMutation(8L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(9L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(10L), new Mutation[0])), Arrays.asList(SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(11L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(12L), new Mutation[0]))}));
    }

    @Test
    public void testGatherBundleAndSortFn_flushOversizedBundle() throws Exception {
        MutationGroup[] mutationGroups;
        SpannerIO.GatherSortCreateBatchesFn testFn = new SpannerIO.GatherSortCreateBatchesFn(10000000L, 100L, 2L, 3L, null);
        DoFn.ProcessContext mockProcessContext = (DoFn.ProcessContext)Mockito.mock(DoFn.ProcessContext.class);
        DoFn.FinishBundleContext mockFinishBundleContext = (DoFn.FinishBundleContext)Mockito.mock(DoFn.FinishBundleContext.class);
        Mockito.when((Object)mockProcessContext.sideInput((PCollectionView)Matchers.any())).thenReturn((Object)this.getSchema());
        DoFn.OutputReceiver mockOutputReceiver = (DoFn.OutputReceiver)Mockito.mock(DoFn.OutputReceiver.class);
        ((DoFn.OutputReceiver)Mockito.doNothing().when((Object)mockOutputReceiver)).output((Object)((Iterable)this.mutationGroupListCaptor.capture()));
        ((DoFn.FinishBundleContext)Mockito.doNothing().when((Object)mockFinishBundleContext)).output((Object)((Iterable)this.mutationGroupListCaptor.capture()), (Instant)Matchers.any(), (BoundedWindow)Matchers.any());
        for (MutationGroup m : mutationGroups = new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(4L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(7L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(9L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(10L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(11L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(2L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildDeleteMutation(8L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(3L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(6L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(5L), new Mutation[0])}) {
            Mockito.when((Object)((MutationGroup)mockProcessContext.element())).thenReturn((Object)m);
            testFn.processElement(mockProcessContext, mockOutputReceiver);
        }
        testFn.finishBundle(mockFinishBundleContext);
        ((DoFn.OutputReceiver)Mockito.verify((Object)mockOutputReceiver, (VerificationMode)Mockito.times((int)3))).output((Object)((Iterable)Matchers.any()));
        ((DoFn.FinishBundleContext)Mockito.verify((Object)mockFinishBundleContext, (VerificationMode)Mockito.times((int)3))).output((Object)((Iterable)Matchers.any()), (Instant)Matchers.any(), (BoundedWindow)Matchers.any());
        List mgListGroups = this.mutationGroupListCaptor.getAllValues();
        Assert.assertEquals((long)6L, (long)mgListGroups.size());
        MatcherAssert.assertThat((Object)((Iterable)mgListGroups.get(0)), (Matcher)org.hamcrest.Matchers.contains((Object[])new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(4L), new Mutation[0])}));
        MatcherAssert.assertThat((Object)((Iterable)mgListGroups.get(1)), (Matcher)org.hamcrest.Matchers.contains((Object[])new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(7L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(9L), new Mutation[0])}));
        MatcherAssert.assertThat((Object)((Iterable)mgListGroups.get(2)), (Matcher)org.hamcrest.Matchers.contains((Object[])new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(10L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(11L), new Mutation[0])}));
        MatcherAssert.assertThat((Object)((Iterable)mgListGroups.get(3)), (Matcher)org.hamcrest.Matchers.contains((Object[])new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(2L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(3L), new Mutation[0])}));
        MatcherAssert.assertThat((Object)((Iterable)mgListGroups.get(4)), (Matcher)org.hamcrest.Matchers.contains((Object[])new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(5L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(6L), new Mutation[0])}));
        MatcherAssert.assertThat((Object)((Iterable)mgListGroups.get(5)), (Matcher)org.hamcrest.Matchers.contains((Object[])new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildDeleteMutation(8L), new Mutation[0])}));
    }

    @Test
    public void testBatchFn_cells() throws Exception {
        SpannerIO.GatherSortCreateBatchesFn testFn = new SpannerIO.GatherSortCreateBatchesFn(10000000L, 21L, 100L, 100L, null);
        this.testAndVerifyBatches(testFn);
    }

    @Test
    public void testBatchFn_size() throws Exception {
        long mutationSize = MutationSizeEstimator.sizeOf((Mutation)SpannerIOWriteTest.buildUpsertMutation(1L));
        SpannerIO.GatherSortCreateBatchesFn testFn = new SpannerIO.GatherSortCreateBatchesFn(mutationSize * 3L, 100L, 100L, 100L, null);
        this.testAndVerifyBatches(testFn);
    }

    @Test
    public void testBatchFn_rows() throws Exception {
        SpannerIO.GatherSortCreateBatchesFn testFn = new SpannerIO.GatherSortCreateBatchesFn(10000L, 100L, 3L, 100L, null);
        this.testAndVerifyBatches(testFn);
    }

    private void testAndVerifyBatches(SpannerIO.GatherSortCreateBatchesFn testFn) throws Exception {
        DoFn.ProcessContext mockProcessContext = (DoFn.ProcessContext)Mockito.mock(DoFn.ProcessContext.class);
        DoFn.FinishBundleContext mockFinishBundleContext = (DoFn.FinishBundleContext)Mockito.mock(DoFn.FinishBundleContext.class);
        Mockito.when((Object)mockProcessContext.sideInput((PCollectionView)Matchers.any())).thenReturn((Object)this.getSchema());
        ((DoFn.FinishBundleContext)Mockito.doNothing().when((Object)mockFinishBundleContext)).output((Object)((Iterable)this.mutationGroupListCaptor.capture()), (Instant)Matchers.any(), (BoundedWindow)Matchers.any());
        List<MutationGroup> mutationGroups = Arrays.asList(SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(4L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(5L), SpannerIOWriteTest.buildUpsertMutation(6L), SpannerIOWriteTest.buildUpsertMutation(7L), SpannerIOWriteTest.buildUpsertMutation(8L), SpannerIOWriteTest.buildUpsertMutation(9L)), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(3L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(10L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(11L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(2L), new Mutation[0]));
        for (MutationGroup m : mutationGroups) {
            Mockito.when((Object)((MutationGroup)mockProcessContext.element())).thenReturn((Object)m);
            testFn.processElement(mockProcessContext, null);
        }
        testFn.finishBundle(mockFinishBundleContext);
        ((DoFn.FinishBundleContext)Mockito.verify((Object)mockFinishBundleContext, (VerificationMode)Mockito.times((int)4))).output((Object)((Iterable)Matchers.any()), (Instant)Matchers.any(), (BoundedWindow)Matchers.any());
        List batches = this.mutationGroupListCaptor.getAllValues();
        Assert.assertEquals((long)4L, (long)batches.size());
        MatcherAssert.assertThat((Object)((Iterable)batches.get(0)), (Matcher)org.hamcrest.Matchers.contains((Object[])new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(1L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(2L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(3L), new Mutation[0])}));
        MatcherAssert.assertThat((Object)((Iterable)batches.get(1)), (Matcher)org.hamcrest.Matchers.contains((Object[])new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(4L), new Mutation[0])}));
        MatcherAssert.assertThat((Object)((Iterable)batches.get(2)), (Matcher)org.hamcrest.Matchers.contains((Object[])new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(5L), SpannerIOWriteTest.buildUpsertMutation(6L), SpannerIOWriteTest.buildUpsertMutation(7L), SpannerIOWriteTest.buildUpsertMutation(8L), SpannerIOWriteTest.buildUpsertMutation(9L))}));
        MatcherAssert.assertThat((Object)((Iterable)batches.get(3)), (Matcher)org.hamcrest.Matchers.contains((Object[])new MutationGroup[]{SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(10L), new Mutation[0]), SpannerIOWriteTest.buildMutationGroup(SpannerIOWriteTest.buildUpsertMutation(11L), new Mutation[0])}));
    }

    @Test
    public void testRefCountedSpannerAccessorOnlyOnce() {
        SpannerConfig config1 = SpannerConfig.create().toBuilder().setServiceFactory((ServiceFactory)this.serviceFactory).setProjectId((ValueProvider)ValueProvider.StaticValueProvider.of((Object)"project")).setInstanceId((ValueProvider)ValueProvider.StaticValueProvider.of((Object)"test1")).setDatabaseId((ValueProvider)ValueProvider.StaticValueProvider.of((Object)"test1")).build();
        SpannerIO.WriteToSpannerFn test1Fn = new SpannerIO.WriteToSpannerFn(config1, SpannerIO.FailureMode.REPORT_FAILURES, null);
        SpannerIO.WriteToSpannerFn test2Fn = new SpannerIO.WriteToSpannerFn(config1, SpannerIO.FailureMode.REPORT_FAILURES, null);
        SpannerIO.WriteToSpannerFn test3Fn = new SpannerIO.WriteToSpannerFn(config1, SpannerIO.FailureMode.REPORT_FAILURES, null);
        test1Fn.setup();
        test2Fn.setup();
        test3Fn.setup();
        test2Fn.teardown();
        test3Fn.teardown();
        test1Fn.teardown();
        ((Spanner)Mockito.verify((Object)this.serviceFactory.mockSpanner(), (VerificationMode)Mockito.times((int)1))).getDatabaseClient(DatabaseId.of((String)"project", (String)"test1", (String)"test1"));
        ((Spanner)Mockito.verify((Object)this.serviceFactory.mockSpanner(), (VerificationMode)Mockito.times((int)1))).close();
    }

    @Test
    public void testRefCountedSpannerAccessorDifferentDbsOnlyOnce() {
        SpannerConfig config1 = SpannerConfig.create().toBuilder().setServiceFactory((ServiceFactory)this.serviceFactory).setMaxCumulativeBackoff((ValueProvider)ValueProvider.StaticValueProvider.of((Object)Duration.standardSeconds((long)10L))).setProjectId((ValueProvider)ValueProvider.StaticValueProvider.of((Object)"project")).setInstanceId((ValueProvider)ValueProvider.StaticValueProvider.of((Object)"test1")).setDatabaseId((ValueProvider)ValueProvider.StaticValueProvider.of((Object)"test1")).build();
        SpannerConfig config2 = config1.toBuilder().setInstanceId((ValueProvider)ValueProvider.StaticValueProvider.of((Object)"test2")).setDatabaseId((ValueProvider)ValueProvider.StaticValueProvider.of((Object)"test2")).build();
        SpannerIO.WriteToSpannerFn test1Fn = new SpannerIO.WriteToSpannerFn(config1, SpannerIO.FailureMode.REPORT_FAILURES, null);
        SpannerIO.WriteToSpannerFn test2Fn = new SpannerIO.WriteToSpannerFn(config1, SpannerIO.FailureMode.REPORT_FAILURES, null);
        SpannerIO.WriteToSpannerFn test3Fn = new SpannerIO.WriteToSpannerFn(config2, SpannerIO.FailureMode.REPORT_FAILURES, null);
        SpannerIO.WriteToSpannerFn test4Fn = new SpannerIO.WriteToSpannerFn(config2, SpannerIO.FailureMode.REPORT_FAILURES, null);
        test1Fn.setup();
        test2Fn.setup();
        test3Fn.setup();
        test4Fn.setup();
        test2Fn.teardown();
        test3Fn.teardown();
        test4Fn.teardown();
        test1Fn.teardown();
        ((Spanner)Mockito.verify((Object)this.serviceFactory.mockSpanner(), (VerificationMode)Mockito.times((int)1))).getDatabaseClient((DatabaseId)ArgumentMatchers.eq((Object)DatabaseId.of((String)"project", (String)"test1", (String)"test1")));
        ((Spanner)Mockito.verify((Object)this.serviceFactory.mockSpanner(), (VerificationMode)Mockito.times((int)1))).getDatabaseClient((DatabaseId)ArgumentMatchers.eq((Object)DatabaseId.of((String)"project", (String)"test2", (String)"test2")));
        ((Spanner)Mockito.verify((Object)this.serviceFactory.mockSpanner(), (VerificationMode)Mockito.times((int)2))).close();
    }

    static MutationGroup buildMutationGroup(Mutation m, Mutation ... other) {
        return MutationGroup.create((Mutation)m, (Mutation[])other);
    }

    static Mutation buildUpsertMutation(Long key) {
        return ((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)TABLE_NAME).set("key").to(key)).build();
    }

    private static Iterable<Mutation> buildMutationBatch(Mutation ... m) {
        return Arrays.asList(m);
    }

    private static Mutation buildDeleteMutation(Long ... keys) {
        KeySet.Builder builder = KeySet.newBuilder();
        for (Long key : keys) {
            builder.addKey(Key.of((Object[])new Object[]{key}));
        }
        return Mutation.delete((String)TABLE_NAME, (KeySet)builder.build());
    }

    private static Iterable<Mutation> mutationsInNoOrder(Iterable<Mutation> expected) {
        final ImmutableSet mutations = ImmutableSet.copyOf(expected);
        return (Iterable)Mockito.argThat((ArgumentMatcher)new ArgumentMatcher<Iterable<Mutation>>(){

            public boolean matches(Iterable<Mutation> argument) {
                if (!(argument instanceof Iterable)) {
                    return false;
                }
                ImmutableSet actual = ImmutableSet.copyOf(argument);
                return actual.equals((Object)mutations);
            }

            public String toString() {
                return "Iterable must match " + mutations;
            }
        });
    }

    private void verifyTableWriteRequestMetricWasSet(SpannerConfig config, String table, String status, long count) {
        HashMap<String, String> baseLabels = this.getBaseMetricsLabels(config);
        baseLabels.put("METHOD", "Write");
        baseLabels.put("TABLE_ID", table);
        baseLabels.put("RESOURCE", GcpResourceIdentifiers.spannerTable((String)baseLabels.get("SPANNER_PROJECT_ID"), (String)((String)config.getInstanceId().get()), (String)((String)config.getDatabaseId().get()), (String)table));
        baseLabels.put("STATUS", status);
        MonitoringInfoMetricName name = MonitoringInfoMetricName.named((String)MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
        MetricsContainerImpl container = (MetricsContainerImpl)MetricsEnvironment.getCurrentContainer();
        Assert.assertEquals((long)count, (long)container.getCounter((MetricName)name).getCumulative());
    }

    private HashMap<String, String> getBaseMetricsLabels(SpannerConfig config) {
        HashMap<String, String> baseLabels = new HashMap<String, String>();
        baseLabels.put("PTRANSFORM", "");
        baseLabels.put("SERVICE", "Spanner");
        baseLabels.put("SPANNER_PROJECT_ID", config.getProjectId() == null || config.getProjectId().get() == null ? SpannerOptions.getDefaultProjectId() : (String)config.getProjectId().get());
        baseLabels.put("SPANNER_INSTANCE_ID", (String)config.getInstanceId().get());
        baseLabels.put("SPANNER_DATABASE_ID", (String)config.getDatabaseId().get());
        return baseLabels;
    }
}

