package org.apache.beam.sdk.io.gcp.spanner;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.BatchReadOnlyTransaction;
import com.google.cloud.spanner.BatchTransactionId;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.FakeBatchTransactionId;
import com.google.cloud.spanner.FakePartitionFactory;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Partition;
import com.google.cloud.spanner.PartitionOptions;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.ResultSets;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.Value;
import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.jetbrains.annotations.NotNull;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Matchers;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.class */
public class SpannerIOReadTest implements Serializable {
    public static final String PROJECT_ID = "1234";
    public static final String INSTANCE_ID = "123";
    public static final String DATABASE_ID = "aaa";
    public static final String TABLE_ID = "users";
    public static final String QUERY_NAME = "My-query";
    public static final String QUERY_STATEMENT = "SELECT * FROM users";

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
    private FakeServiceFactory serviceFactory;
    private BatchReadOnlyTransaction mockBatchTx;
    private Partition fakePartition;
    private SpannerConfig spannerConfig;
    private static final TimestampBound TIMESTAMP_BOUND = TimestampBound.ofReadTimestamp(Timestamp.ofTimeMicroseconds(12345));
    private static final Type FAKE_TYPE = Type.struct(new Type.StructField[]{Type.StructField.of("id", Type.int64()), Type.StructField.of("name", Type.string())});
    private static final List<Struct> FAKE_ROWS = Arrays.asList(((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("id").to(Value.int64(1))).set("name").to("Alice")).build(), ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("id").to(Value.int64(2))).set("name").to("Bob")).build(), ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("id").to(Value.int64(3))).set("name").to("Carl")).build(), ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("id").to(Value.int64(4))).set("name").to("Dan")).build(), ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("id").to(Value.int64(5))).set("name").to("Evan")).build(), ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("id").to(Value.int64(6))).set("name").to("Floyd")).build());

    @Before
    public void setUp() throws Exception {
        this.serviceFactory = new FakeServiceFactory();
        this.mockBatchTx = (BatchReadOnlyTransaction) Mockito.mock(BatchReadOnlyTransaction.class);
        this.fakePartition = FakePartitionFactory.createFakeQueryPartition(ByteString.copyFromUtf8("one"));
        this.spannerConfig = SpannerConfig.create().withProjectId(PROJECT_ID).withInstanceId(INSTANCE_ID).withDatabaseId(DATABASE_ID).withServiceFactory(this.serviceFactory);
        Mockito.when(this.mockBatchTx.getBatchTransactionId()).thenReturn(new FakeBatchTransactionId("runQueryTest"));
        Mockito.when(this.serviceFactory.mockBatchClient().batchReadOnlyTransaction(TIMESTAMP_BOUND)).thenReturn(this.mockBatchTx);
        Mockito.when(this.serviceFactory.mockBatchClient().batchReadOnlyTransaction((BatchTransactionId) Matchers.any(BatchTransactionId.class))).thenReturn(this.mockBatchTx);
        MetricsContainerImpl metricsContainerImpl = new MetricsContainerImpl((String) null);
        MetricsEnvironment.setProcessWideContainer(metricsContainerImpl);
        MetricsEnvironment.setCurrentContainer(metricsContainerImpl);
    }

    @Test
    public void runBatchQueryTestWithSpannerConfig() {
        runBatchQueryTest(SpannerIO.read().withSpannerConfig(this.spannerConfig).withQuery(QUERY_STATEMENT).withQueryName(QUERY_NAME).withTimestampBound(TIMESTAMP_BOUND));
    }

    @Test
    public void runBatchQueryTestWithUnspecifiedProject() {
        runBatchQueryTest(SpannerIO.read().withSpannerConfig(SpannerConfig.create().withInstanceId(INSTANCE_ID).withDatabaseId(DATABASE_ID).withServiceFactory(this.serviceFactory)).withQuery(QUERY_STATEMENT).withQueryName(QUERY_NAME).withTimestampBound(TIMESTAMP_BOUND));
    }

    @Test
    public void runBatchQueryTestWithNullProject() {
        runBatchQueryTest(SpannerIO.read().withSpannerConfig(SpannerConfig.create().withProjectId((String) null).withInstanceId(INSTANCE_ID).withDatabaseId(DATABASE_ID).withServiceFactory(this.serviceFactory)).withQuery(QUERY_STATEMENT).withQueryName(QUERY_NAME).withTimestampBound(TIMESTAMP_BOUND));
    }

    @Test
    public void runBatchQueryTestWithPriority() {
        SpannerIO.Read withHighPriority = SpannerIO.read().withSpannerConfig(this.spannerConfig).withQuery(QUERY_STATEMENT).withQueryName(QUERY_NAME).withTimestampBound(TIMESTAMP_BOUND).withHighPriority();
        runBatchQueryTest(withHighPriority);
        Assert.assertEquals(Options.RpcPriority.HIGH, withHighPriority.getSpannerConfig().getRpcPriority().get());
    }

    private void runBatchQueryTest(SpannerIO.Read read) {
        PCollection apply = this.pipeline.apply("read q", read);
        Mockito.when(this.mockBatchTx.partitionQuery((PartitionOptions) Matchers.any(PartitionOptions.class), (Statement) Matchers.eq(Statement.of(QUERY_STATEMENT)), new Options.QueryOption[]{(Options.QueryOption) Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenReturn(Arrays.asList(this.fakePartition, this.fakePartition, this.fakePartition));
        Mockito.when(this.mockBatchTx.execute((Partition) Matchers.any(Partition.class))).thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(0, 2)), new ResultSet[]{ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 4)), ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(4, 6))});
        PAssert.that(apply).containsInAnyOrder(FAKE_ROWS);
        this.pipeline.run();
        verifyQueryRequestMetricWasSet(read.getSpannerConfig(), QUERY_NAME, "ok", 4L);
    }

    @Test
    public void runBatchQueryTestWithFailures() {
        this.pipeline.apply("read q", SpannerIO.read().withSpannerConfig(this.spannerConfig).withQuery(QUERY_STATEMENT).withQueryName(QUERY_NAME).withTimestampBound(TIMESTAMP_BOUND));
        Mockito.when(this.mockBatchTx.partitionQuery((PartitionOptions) Matchers.any(PartitionOptions.class), (Statement) Matchers.eq(Statement.of(QUERY_STATEMENT)), new Options.QueryOption[]{(Options.QueryOption) Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenReturn(Arrays.asList(this.fakePartition, this.fakePartition));
        Mockito.when(this.mockBatchTx.execute((Partition) Matchers.any(Partition.class))).thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS)).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException(ErrorCode.PERMISSION_DENIED, "Simulated Failure")});
        TestPipeline testPipeline = this.pipeline;
        Objects.requireNonNull(testPipeline);
        Assert.assertThrows("PERMISSION_DENIED: Simulated Failure", Pipeline.PipelineExecutionException.class, testPipeline::run);
        Assert.assertTrue(getQueryRequestMetric(this.spannerConfig, QUERY_NAME, "ok") >= 1);
        verifyQueryRequestMetricWasSet(this.spannerConfig, QUERY_NAME, "permission_denied", 1L);
    }

    @Test
    public void runNaiveQueryTestWithProjectId() {
        runNaiveQueryTest(SpannerIO.read().withSpannerConfig(this.spannerConfig).withQuery(QUERY_STATEMENT).withQueryName(QUERY_NAME).withTimestampBound(TIMESTAMP_BOUND));
    }

    @Test
    public void runNaiveQueryTestWithUnspecifiedProject() {
        runNaiveQueryTest(SpannerIO.read().withSpannerConfig(SpannerConfig.create().withInstanceId(INSTANCE_ID).withDatabaseId(DATABASE_ID).withServiceFactory(this.serviceFactory)).withQuery(QUERY_STATEMENT).withQueryName(QUERY_NAME).withTimestampBound(TIMESTAMP_BOUND));
    }

    @Test
    public void runNaiveQueryTestWithNullProject() {
        runNaiveQueryTest(SpannerIO.read().withSpannerConfig(SpannerConfig.create().withProjectId((String) null).withInstanceId(INSTANCE_ID).withDatabaseId(DATABASE_ID).withServiceFactory(this.serviceFactory)).withQuery(QUERY_STATEMENT).withQueryName(QUERY_NAME).withTimestampBound(TIMESTAMP_BOUND));
    }

    @Test
    public void runNaiveQueryTestWithPriority() {
        SpannerIO.Read withHighPriority = SpannerIO.read().withSpannerConfig(this.spannerConfig).withQuery(QUERY_STATEMENT).withQueryName(QUERY_NAME).withTimestampBound(TIMESTAMP_BOUND).withHighPriority();
        runNaiveQueryTest(withHighPriority);
        Assert.assertEquals(Options.RpcPriority.HIGH, withHighPriority.getSpannerConfig().getRpcPriority().get());
    }

    private void runNaiveQueryTest(SpannerIO.Read read) {
        SpannerIO.Read withBatching = read.withBatching(false);
        PCollection apply = this.pipeline.apply("read q", withBatching);
        Mockito.when(this.mockBatchTx.executeQuery((Statement) Matchers.eq(Statement.of(QUERY_STATEMENT)), new Options.QueryOption[]{(Options.QueryOption) Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS));
        PAssert.that(apply).containsInAnyOrder(FAKE_ROWS);
        this.pipeline.run();
        verifyQueryRequestMetricWasSet(withBatching.getSpannerConfig(), QUERY_NAME, "ok", 1L);
    }

    @Test
    public void runNaiveQueryTestWithAnonymousQuery() {
        PCollection apply = this.pipeline.apply("read q", SpannerIO.read().withSpannerConfig(this.spannerConfig).withQuery(QUERY_STATEMENT).withTimestampBound(TIMESTAMP_BOUND).withHighPriority().withBatching(false));
        Mockito.when(this.mockBatchTx.executeQuery((Statement) Matchers.eq(Statement.of(QUERY_STATEMENT)), new Options.QueryOption[]{(Options.QueryOption) Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS));
        PAssert.that(apply).containsInAnyOrder(FAKE_ROWS);
        this.pipeline.run();
        verifyQueryRequestMetricWasSet(this.spannerConfig, String.format("UNNAMED_QUERY#%08x", Integer.valueOf(QUERY_STATEMENT.hashCode())), "ok", 1L);
    }

    @Test
    public void runNaiveQueryTestWithFailures() {
        this.pipeline.apply("read q", SpannerIO.read().withSpannerConfig(this.spannerConfig).withQuery(QUERY_STATEMENT).withQueryName(QUERY_NAME).withTimestampBound(TIMESTAMP_BOUND).withBatching(false));
        Mockito.when(this.mockBatchTx.executeQuery((Statement) Matchers.eq(Statement.of(QUERY_STATEMENT)), new Options.QueryOption[]{(Options.QueryOption) Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException(ErrorCode.PERMISSION_DENIED, "Simulated Failure")});
        TestPipeline testPipeline = this.pipeline;
        Objects.requireNonNull(testPipeline);
        Assert.assertThrows("PERMISSION_DENIED: Simulated Failure", Pipeline.PipelineExecutionException.class, testPipeline::run);
        verifyQueryRequestMetricWasSet(this.spannerConfig, QUERY_NAME, "permission_denied", 1L);
    }

    @Test
    public void runBatchReadTestWithProjectId() {
        runBatchReadTest(SpannerIO.read().withSpannerConfig(this.spannerConfig).withTable(TABLE_ID).withColumns(new String[]{"id", "name"}).withTimestampBound(TIMESTAMP_BOUND));
    }

    @Test
    public void runBatchReadTestWithUnspecifiedProject() {
        runBatchReadTest(SpannerIO.read().withSpannerConfig(SpannerConfig.create().withInstanceId(INSTANCE_ID).withDatabaseId(DATABASE_ID).withServiceFactory(this.serviceFactory)).withTable(TABLE_ID).withColumns(new String[]{"id", "name"}).withTimestampBound(TIMESTAMP_BOUND));
    }

    @Test
    public void runBatchReadTestWithNullProject() {
        runBatchReadTest(SpannerIO.read().withSpannerConfig(SpannerConfig.create().withProjectId((String) null).withInstanceId(INSTANCE_ID).withDatabaseId(DATABASE_ID).withServiceFactory(this.serviceFactory)).withTable(TABLE_ID).withColumns(new String[]{"id", "name"}).withTimestampBound(TIMESTAMP_BOUND));
    }

    @Test
    public void runBatchReadTestWithPriority() {
        SpannerIO.Read withHighPriority = SpannerIO.read().withSpannerConfig(this.spannerConfig).withTable(TABLE_ID).withColumns(new String[]{"id", "name"}).withTimestampBound(TIMESTAMP_BOUND).withHighPriority();
        runBatchReadTest(withHighPriority);
        Assert.assertEquals(Options.RpcPriority.HIGH, withHighPriority.getSpannerConfig().getRpcPriority().get());
    }

    private void runBatchReadTest(SpannerIO.Read read) {
        PCollection apply = this.pipeline.apply("read q", read);
        Mockito.when(this.mockBatchTx.partitionRead((PartitionOptions) Matchers.any(PartitionOptions.class), (String) Matchers.eq(TABLE_ID), (KeySet) Matchers.eq(KeySet.all()), (Iterable) Matchers.eq(Arrays.asList("id", "name")), new Options.ReadOption[]{(Options.ReadOption) Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenReturn(Arrays.asList(this.fakePartition, this.fakePartition, this.fakePartition));
        Mockito.when(this.mockBatchTx.execute((Partition) Matchers.any(Partition.class))).thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(0, 2)), new ResultSet[]{ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 4)), ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(4, 6))});
        PAssert.that(apply).containsInAnyOrder(FAKE_ROWS);
        this.pipeline.run();
        verifyTableRequestMetricWasSet(read.getSpannerConfig(), TABLE_ID, "ok", 4L);
    }

    @Test
    public void runBatchReadTestWithFailures() {
        this.pipeline.apply("read q", SpannerIO.read().withSpannerConfig(this.spannerConfig).withTable(TABLE_ID).withColumns(new String[]{"id", "name"}).withTimestampBound(TIMESTAMP_BOUND));
        Mockito.when(this.mockBatchTx.partitionRead((PartitionOptions) Matchers.any(PartitionOptions.class), (String) Matchers.eq(TABLE_ID), (KeySet) Matchers.eq(KeySet.all()), (Iterable) Matchers.eq(Arrays.asList("id", "name")), new Options.ReadOption[]{(Options.ReadOption) Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenReturn(Arrays.asList(this.fakePartition));
        Mockito.when(this.mockBatchTx.execute((Partition) Matchers.any(Partition.class))).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException(ErrorCode.PERMISSION_DENIED, "Simulated Failure")});
        TestPipeline testPipeline = this.pipeline;
        Objects.requireNonNull(testPipeline);
        Assert.assertThrows("PERMISSION_DENIED: Simulated Failure", Pipeline.PipelineExecutionException.class, testPipeline::run);
        verifyTableRequestMetricWasSet(this.spannerConfig, TABLE_ID, "ok", 1L);
        verifyTableRequestMetricWasSet(this.spannerConfig, TABLE_ID, "permission_denied", 1L);
    }

    @Test
    public void runNaiveReadTestWithProjectId() {
        runNaiveReadTest(SpannerIO.read().withSpannerConfig(this.spannerConfig).withTable(TABLE_ID).withColumns(new String[]{"id", "name"}).withTimestampBound(TIMESTAMP_BOUND));
    }

    @Test
    public void runNaiveReadTestWithUnspecifiedProject() {
        runNaiveReadTest(SpannerIO.read().withSpannerConfig(SpannerConfig.create().withInstanceId(INSTANCE_ID).withDatabaseId(DATABASE_ID).withServiceFactory(this.serviceFactory)).withTable(TABLE_ID).withColumns(new String[]{"id", "name"}).withTimestampBound(TIMESTAMP_BOUND));
    }

    @Test
    public void runNaiveReadTestWithNullProject() {
        runNaiveReadTest(SpannerIO.read().withSpannerConfig(SpannerConfig.create().withProjectId((String) null).withInstanceId(INSTANCE_ID).withDatabaseId(DATABASE_ID).withServiceFactory(this.serviceFactory)).withTable(TABLE_ID).withColumns(new String[]{"id", "name"}).withTimestampBound(TIMESTAMP_BOUND));
    }

    @Test
    public void runNaiveReadTestWithPriority() {
        SpannerIO.Read withHighPriority = SpannerIO.read().withSpannerConfig(this.spannerConfig).withTable(TABLE_ID).withColumns(new String[]{"id", "name"}).withTimestampBound(TIMESTAMP_BOUND).withHighPriority();
        runNaiveReadTest(withHighPriority);
        Assert.assertEquals(Options.RpcPriority.HIGH, withHighPriority.getSpannerConfig().getRpcPriority().get());
    }

    private void runNaiveReadTest(SpannerIO.Read read) {
        SpannerIO.Read withBatching = read.withBatching(false);
        PCollection apply = this.pipeline.apply("read q", withBatching);
        Mockito.when(this.mockBatchTx.read((String) Matchers.eq(TABLE_ID), (KeySet) Matchers.eq(KeySet.all()), (Iterable) Matchers.eq(Arrays.asList("id", "name")), new Options.ReadOption[]{(Options.ReadOption) Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS));
        PAssert.that(apply).containsInAnyOrder(FAKE_ROWS);
        this.pipeline.run();
        verifyTableRequestMetricWasSet(withBatching.getSpannerConfig(), TABLE_ID, "ok", 1L);
    }

    @Test
    public void runNaiveReadTestWithFailures() {
        this.pipeline.apply("read q", SpannerIO.read().withSpannerConfig(this.spannerConfig).withTable(TABLE_ID).withColumns(new String[]{"id", "name"}).withTimestampBound(TIMESTAMP_BOUND).withBatching(false));
        Mockito.when(this.mockBatchTx.read((String) Matchers.eq(TABLE_ID), (KeySet) Matchers.eq(KeySet.all()), (Iterable) Matchers.eq(Arrays.asList("id", "name")), new Options.ReadOption[]{(Options.ReadOption) Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException(ErrorCode.PERMISSION_DENIED, "Simulated Failure")});
        TestPipeline testPipeline = this.pipeline;
        Objects.requireNonNull(testPipeline);
        Assert.assertThrows("PERMISSION_DENIED: Simulated Failure", Pipeline.PipelineExecutionException.class, testPipeline::run);
        verifyTableRequestMetricWasSet(this.spannerConfig, TABLE_ID, "permission_denied", 1L);
    }

    @Test
    public void runBatchReadUsingIndex() {
        PCollection apply = this.pipeline.apply("read q", SpannerIO.read().withTimestamp(Timestamp.now()).withSpannerConfig(this.spannerConfig).withTable(TABLE_ID).withColumns(new String[]{"id", "name"}).withIndex("theindex").withTimestampBound(TIMESTAMP_BOUND));
        Mockito.when(this.mockBatchTx.partitionReadUsingIndex((PartitionOptions) Matchers.any(PartitionOptions.class), (String) Matchers.eq(TABLE_ID), (String) Matchers.eq("theindex"), (KeySet) Matchers.eq(KeySet.all()), (Iterable) Matchers.eq(Arrays.asList("id", "name")), new Options.ReadOption[]{(Options.ReadOption) Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenReturn(Arrays.asList(this.fakePartition, this.fakePartition, this.fakePartition));
        Mockito.when(this.mockBatchTx.execute((Partition) Matchers.any(Partition.class))).thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(0, 2)), new ResultSet[]{ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 4)), ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(4, 6))});
        PAssert.that(apply).containsInAnyOrder(FAKE_ROWS);
        this.pipeline.run();
        verifyTableRequestMetricWasSet(this.spannerConfig, TABLE_ID, "ok", 4L);
    }

    @Test
    public void runNaiveReadUsingIndex() {
        PCollection apply = this.pipeline.apply("read q", SpannerIO.read().withTimestamp(Timestamp.now()).withSpannerConfig(this.spannerConfig).withTable(TABLE_ID).withColumns(new String[]{"id", "name"}).withIndex("theindex").withTimestampBound(TIMESTAMP_BOUND).withBatching(false));
        Mockito.when(this.mockBatchTx.readUsingIndex((String) Matchers.eq(TABLE_ID), (String) Matchers.eq("theindex"), (KeySet) Matchers.eq(KeySet.all()), (Iterable) Matchers.eq(Arrays.asList("id", "name")), new Options.ReadOption[]{(Options.ReadOption) Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS));
        PAssert.that(apply).containsInAnyOrder(FAKE_ROWS);
        this.pipeline.run();
        verifyTableRequestMetricWasSet(this.spannerConfig, TABLE_ID, "ok", 1L);
    }

    @Test
    public void readAllPipelineWithSpannerReadAllConfiguration() {
        runReadAllPipeline(SpannerIO.readAll().withProjectId(PROJECT_ID).withInstanceId(INSTANCE_ID).withDatabaseId(DATABASE_ID).withServiceFactory(this.serviceFactory).withLowPriority().withTransaction(this.pipeline.apply("tx", SpannerIO.createTransaction().withSpannerConfig(this.spannerConfig).withTimestampBound(TIMESTAMP_BOUND))));
    }

    @Test
    public void readAllPipelineWithSpannerReadAllConfigurationAsValueProviders() {
        runReadAllPipeline(SpannerIO.readAll().withProjectId(ValueProvider.StaticValueProvider.of(PROJECT_ID)).withInstanceId(ValueProvider.StaticValueProvider.of(INSTANCE_ID)).withDatabaseId(ValueProvider.StaticValueProvider.of(DATABASE_ID)).withServiceFactory(this.serviceFactory).withHighPriority().withTransaction(this.pipeline.apply("tx", SpannerIO.createTransaction().withSpannerConfig(this.spannerConfig).withTimestampBound(TIMESTAMP_BOUND))));
    }

    @Test
    public void readAllPipelineWithSpannerCreationTransactionConfiguration() {
        runReadAllPipeline(SpannerIO.readAll().withSpannerConfig(this.spannerConfig).withTransaction(this.pipeline.apply("tx", SpannerIO.createTransaction().withProjectId(PROJECT_ID).withInstanceId(INSTANCE_ID).withDatabaseId(DATABASE_ID).withServiceFactory(this.serviceFactory).withTimestampBound(TIMESTAMP_BOUND))));
    }

    @Test
    public void readAllPipelineWithSpannerCreationTransactionConfigurationAsValueProviders() {
        runReadAllPipeline(SpannerIO.readAll().withSpannerConfig(this.spannerConfig).withTransaction(this.pipeline.apply("tx", SpannerIO.createTransaction().withProjectId(ValueProvider.StaticValueProvider.of(PROJECT_ID)).withInstanceId(ValueProvider.StaticValueProvider.of(INSTANCE_ID)).withDatabaseId(ValueProvider.StaticValueProvider.of(DATABASE_ID)).withServiceFactory(this.serviceFactory).withTimestampBound(TIMESTAMP_BOUND))));
    }

    @Test
    public void readAllPipeline() {
        runReadAllPipeline(SpannerIO.readAll().withSpannerConfig(this.spannerConfig).withTransaction(this.pipeline.apply("tx", SpannerIO.createTransaction().withSpannerConfig(this.spannerConfig).withTimestampBound(TIMESTAMP_BOUND))));
    }

    private void runReadAllPipeline(SpannerIO.ReadAll readAll) {
        PCollection apply = this.pipeline.apply(Create.of(ReadOperation.create().withQuery(QUERY_STATEMENT).withQueryName(QUERY_NAME), new ReadOperation[]{ReadOperation.create().withTable(TABLE_ID).withColumns(new String[]{"id", "name"})})).apply("read all", readAll);
        Mockito.when(this.mockBatchTx.partitionQuery((PartitionOptions) Matchers.any(PartitionOptions.class), (Statement) Matchers.eq(Statement.of(QUERY_STATEMENT)), new Options.QueryOption[]{(Options.QueryOption) Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenReturn(Arrays.asList(this.fakePartition, this.fakePartition));
        Mockito.when(this.mockBatchTx.partitionRead((PartitionOptions) Matchers.any(PartitionOptions.class), (String) Matchers.eq(TABLE_ID), (KeySet) Matchers.eq(KeySet.all()), (Iterable) Matchers.eq(Arrays.asList("id", "name")), new Options.ReadOption[]{(Options.ReadOption) Matchers.any(Options.ReadQueryUpdateTransactionOption.class)})).thenReturn(Collections.singletonList(this.fakePartition));
        Mockito.when(this.mockBatchTx.execute((Partition) Matchers.any(Partition.class))).thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(0, 2)), new ResultSet[]{ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 4)), ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(4, 6))});
        PAssert.that(apply).containsInAnyOrder(FAKE_ROWS);
        this.pipeline.run();
        verifyTableRequestMetricWasSet(this.spannerConfig, TABLE_ID, "ok", 2L);
        verifyQueryRequestMetricWasSet(this.spannerConfig, QUERY_NAME, "ok", 3L);
    }

    private long getRequestMetricCount(HashMap<String, String> hashMap) {
        return MetricsEnvironment.getCurrentContainer().getCounter(MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, hashMap)).getCumulative().longValue();
    }

    private long getTableRequestMetric(SpannerConfig spannerConfig, String str, String str2) {
        HashMap<String, String> baseMetricsLabels = getBaseMetricsLabels(spannerConfig);
        baseMetricsLabels.put("METHOD", "Read");
        baseMetricsLabels.put("TABLE_ID", str);
        baseMetricsLabels.put("RESOURCE", GcpResourceIdentifiers.spannerTable(baseMetricsLabels.get("SPANNER_PROJECT_ID"), (String) spannerConfig.getInstanceId().get(), (String) spannerConfig.getDatabaseId().get(), str));
        baseMetricsLabels.put("STATUS", str2);
        return getRequestMetricCount(baseMetricsLabels);
    }

    private long getQueryRequestMetric(SpannerConfig spannerConfig, String str, String str2) {
        HashMap<String, String> baseMetricsLabels = getBaseMetricsLabels(spannerConfig);
        baseMetricsLabels.put("METHOD", "Read");
        baseMetricsLabels.put("SPANNER_QUERY_NAME", str);
        baseMetricsLabels.put("RESOURCE", GcpResourceIdentifiers.spannerQuery(baseMetricsLabels.get("SPANNER_PROJECT_ID"), (String) spannerConfig.getInstanceId().get(), (String) spannerConfig.getDatabaseId().get(), str));
        baseMetricsLabels.put("STATUS", str2);
        return getRequestMetricCount(baseMetricsLabels);
    }

    private void verifyTableRequestMetricWasSet(SpannerConfig spannerConfig, String str, String str2, long j) {
        Assert.assertEquals(j, getTableRequestMetric(spannerConfig, str, str2));
    }

    private void verifyQueryRequestMetricWasSet(SpannerConfig spannerConfig, String str, String str2, long j) {
        Assert.assertEquals(j, getQueryRequestMetric(spannerConfig, str, str2));
    }

    @NotNull
    private HashMap<String, String> getBaseMetricsLabels(SpannerConfig spannerConfig) {
        HashMap<String, String> hashMap = new HashMap<>();
        hashMap.put("PTRANSFORM", "");
        hashMap.put("SERVICE", "Spanner");
        hashMap.put("SPANNER_PROJECT_ID", (spannerConfig.getProjectId() == null || spannerConfig.getProjectId().get() == null) ? SpannerOptions.getDefaultProjectId() : (String) spannerConfig.getProjectId().get());
        hashMap.put("SPANNER_INSTANCE_ID", (String) spannerConfig.getInstanceId().get());
        hashMap.put("SPANNER_DATABASE_ID", (String) spannerConfig.getDatabaseId().get());
        return hashMap;
    }
}
