package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.Streamingbuffer;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1beta1.AvroProto;
import com.google.cloud.bigquery.storage.v1beta1.ReadOptions;
import com.google.cloud.bigquery.storage.v1beta1.Storage;
import com.google.protobuf.ByteString;
import java.io.ByteArrayOutputStream;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.protobuf.ByteStringCoder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.OffsetBasedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageStreamSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.model.Statement;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.class */
public class BigQueryIOStorageReadTest {
    private transient PipelineOptions options;
    private transient TestPipeline p;
    private static final String AVRO_SCHEMA_STRING = "{\"namespace\": \"example.avro\",\n \"type\": \"record\",\n \"name\": \"RowRecord\",\n \"fields\": [\n     {\"name\": \"name\", \"type\": \"string\"},\n     {\"name\": \"number\", \"type\": \"long\"}\n ]\n}";
    private static final Schema AVRO_SCHEMA = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
    private static final TableSchema TABLE_SCHEMA = new TableSchema().setFields(ImmutableList.of(new TableFieldSchema().setName("name").setType("STRING").setMode("REQUIRED"), new TableFieldSchema().setName("number").setType("INTEGER").setMode("REQUIRED")));
    private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get();
    private transient TemporaryFolder testFolder = new TemporaryFolder();

    @Rule
    public final transient TestRule folderThenPipeline = new TestRule() { // from class: org.apache.beam.sdk.io.gcp.bigquery.BigQueryIOStorageReadTest.1
        public Statement apply(final Statement statement, final Description description) {
            return BigQueryIOStorageReadTest.this.testFolder.apply(new Statement() { // from class: org.apache.beam.sdk.io.gcp.bigquery.BigQueryIOStorageReadTest.1.1
                public void evaluate() throws Throwable {
                    BigQueryIOStorageReadTest.this.options = TestPipeline.testingPipelineOptions();
                    BigQueryIOStorageReadTest.this.options.as(BigQueryOptions.class).setProject("project-id");
                    BigQueryIOStorageReadTest.this.options.as(BigQueryOptions.class).setTempLocation(BigQueryIOStorageReadTest.this.testFolder.getRoot().getAbsolutePath());
                    BigQueryIOStorageReadTest.this.p = TestPipeline.fromOptions(BigQueryIOStorageReadTest.this.options);
                    BigQueryIOStorageReadTest.this.p.apply(statement, description).evaluate();
                }
            }, description);
        }
    };

    @Rule
    public transient ExpectedException thrown = ExpectedException.none();
    private FakeDatasetService fakeDatasetService = new FakeDatasetService();

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest$ParseKeyValue.class */
    private static final class ParseKeyValue implements SerializableFunction<SchemaAndRecord, KV<String, Long>> {
        private ParseKeyValue() {
        }

        public KV<String, Long> apply(SchemaAndRecord schemaAndRecord) {
            return KV.of(schemaAndRecord.getRecord().get("name").toString(), (Long) schemaAndRecord.getRecord().get("number"));
        }
    }

    @Before
    public void setUp() throws Exception {
        FakeDatasetService.setUp();
    }

    @Test
    public void testBuildTableBasedSource() {
        BigQueryIO.TypedRead from = BigQueryIO.read(new BigQueryIO.TableRowParser()).withCoder(TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table");
        checkTypedReadTableObject(from, "foo.com:project", "dataset", "table");
        Assert.assertTrue(from.getValidate());
    }

    @Test
    public void testBuildTableBasedSourceWithoutValidation() {
        BigQueryIO.TypedRead withoutValidation = BigQueryIO.read(new BigQueryIO.TableRowParser()).withCoder(TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table").withoutValidation();
        checkTypedReadTableObject(withoutValidation, "foo.com:project", "dataset", "table");
        Assert.assertFalse(withoutValidation.getValidate());
    }

    @Test
    public void testBuildTableBasedSourceWithDefaultProject() {
        checkTypedReadTableObject(BigQueryIO.read(new BigQueryIO.TableRowParser()).withCoder(TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("myDataset.myTable"), null, "myDataset", "myTable");
    }

    @Test
    public void testBuildTableBasedSourceWithReadOptions() {
        ReadOptions.TableReadOptions build = ReadOptions.TableReadOptions.newBuilder().addSelectedFields("field1").addSelectedFields("field2").setRowRestriction("int_field > 5").build();
        BigQueryIO.TypedRead withReadOptions = BigQueryIO.read(new BigQueryIO.TableRowParser()).withCoder(TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table").withReadOptions(build);
        checkTypedReadTableObject(withReadOptions, "foo.com:project", "dataset", "table");
        Assert.assertEquals(withReadOptions.getReadOptions(), build);
    }

    @Test
    public void testBuildTableBasedSourceWithTableReference() {
        checkTypedReadTableObject(BigQueryIO.read(new BigQueryIO.TableRowParser()).withCoder(TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from(new TableReference().setProjectId("foo.com:project").setDatasetId("dataset").setTableId("table")), "foo.com:project", "dataset", "table");
    }

    private void checkTypedReadTableObject(BigQueryIO.TypedRead typedRead, String str, String str2, String str3) {
        Assert.assertEquals(str, typedRead.getTable().getProjectId());
        Assert.assertEquals(str2, typedRead.getTable().getDatasetId());
        Assert.assertEquals(str3, typedRead.getTable().getTableId());
        Assert.assertNull(typedRead.getQuery());
        Assert.assertEquals(BigQueryIO.TypedRead.Method.DIRECT_READ, typedRead.getMethod());
    }

    @Test
    public void testBuildSourceWithTableAndFlatten() {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Invalid BigQueryIO.Read: Specifies a table with a result flattening preference, which only applies to queries");
        this.p.apply("ReadMyTable", BigQueryIO.read(new BigQueryIO.TableRowParser()).withCoder(TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table").withoutResultFlattening());
        this.p.run();
    }

    @Test
    public void testBuildSourceWithTableAndSqlDialect() {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Invalid BigQueryIO.Read: Specifies a table with a SQL dialect preference, which only applies to queries");
        this.p.apply("ReadMyTable", BigQueryIO.read(new BigQueryIO.TableRowParser()).withCoder(TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table").usingStandardSql());
        this.p.run();
    }

    @Test
    public void testDisplayData() {
        MatcherAssert.assertThat(DisplayData.from(BigQueryIO.read(new BigQueryIO.TableRowParser()).withCoder(TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table")), DisplayDataMatchers.hasDisplayItem("table", "foo.com:project:dataset.table"));
    }

    @Test
    public void testEvaluatedDisplayData() {
        MatcherAssert.assertThat(DisplayDataEvaluator.create().displayDataForPrimitiveSourceTransforms(BigQueryIO.read(new BigQueryIO.TableRowParser()).withCoder(TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table")), Matchers.hasItem(DisplayDataMatchers.hasDisplayItem("table")));
    }

    @Test
    public void testName() {
        Assert.assertEquals("BigQueryIO.TypedRead", BigQueryIO.read(new BigQueryIO.TableRowParser()).withCoder(TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table").getName());
    }

    @Test
    public void testCoderInference() {
        Assert.assertEquals(KvCoder.of(ByteStringCoder.of(), ProtoCoder.of(Storage.ReadSession.class)), BigQueryIO.read(new SerializableFunction<SchemaAndRecord, KV<ByteString, Storage.ReadSession>>() { // from class: org.apache.beam.sdk.io.gcp.bigquery.BigQueryIOStorageReadTest.2
            public KV<ByteString, Storage.ReadSession> apply(SchemaAndRecord schemaAndRecord) {
                return null;
            }
        }).inferCoder(CoderRegistry.createDefault()));
    }

    @Test
    public void testTableSourceEstimatedSize() throws Exception {
        doTableSourceEstimatedSizeTest(false);
    }

    @Test
    public void testTableSourceEstimatedSize_IgnoresStreamingBuffer() throws Exception {
        doTableSourceEstimatedSizeTest(true);
    }

    private void doTableSourceEstimatedSizeTest(boolean z) throws Exception {
        this.fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
        TableReference parseTableSpec = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
        Table numBytes = new Table().setTableReference(parseTableSpec).setNumBytes(100L);
        if (z) {
            numBytes.setStreamingBuffer(new Streamingbuffer().setEstimatedBytes(BigInteger.TEN));
        }
        this.fakeDatasetService.createTable(numBytes);
        Assert.assertEquals(100L, BigQueryStorageTableSource.create(ValueProvider.StaticValueProvider.of(parseTableSpec), (ReadOptions.TableReadOptions) null, new BigQueryIO.TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(this.fakeDatasetService)).getEstimatedSizeBytes(this.options));
    }

    @Test
    public void testTableSourceEstimatedSize_WithDefaultProject() throws Exception {
        this.fakeDatasetService.createDataset("project-id", "dataset", "", "", null);
        this.fakeDatasetService.createTable(new Table().setTableReference(BigQueryHelpers.parseTableSpec("project-id:dataset.table")).setNumBytes(100L));
        Assert.assertEquals(100L, BigQueryStorageTableSource.create(ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")), (ReadOptions.TableReadOptions) null, new BigQueryIO.TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(this.fakeDatasetService)).getEstimatedSizeBytes(this.options));
    }

    @Test
    public void testTableSourceInitialSplit() throws Exception {
        doTableSourceInitialSplitTest(1024L, 1024);
    }

    @Test
    public void testTableSourceInitialSplit_MinSplitCount() throws Exception {
        doTableSourceInitialSplitTest(1048576L, 10);
    }

    @Test
    public void testTableSourceInitialSplit_MaxSplitCount() throws Exception {
        doTableSourceInitialSplitTest(10L, 10000);
    }

    private void doTableSourceInitialSplitTest(long j, int i) throws Exception {
        this.fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
        TableReference parseTableSpec = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
        this.fakeDatasetService.createTable(new Table().setTableReference(parseTableSpec).setNumBytes(1048576L).setSchema(new TableSchema()));
        Storage.CreateReadSessionRequest build = Storage.CreateReadSessionRequest.newBuilder().setParent("projects/project-id").setTableReference(BigQueryHelpers.toTableRefProto(parseTableSpec)).setRequestedStreams(i).build();
        Storage.ReadSession.Builder newBuilder = Storage.ReadSession.newBuilder();
        for (int i2 = 0; i2 < i; i2++) {
            newBuilder.addStreams(Storage.Stream.newBuilder().setName("stream-" + i2));
        }
        Mockito.when(((BigQueryServices.StorageClient) Mockito.mock(BigQueryServices.StorageClient.class)).createReadSession(build)).thenReturn(newBuilder.build());
        Assert.assertEquals(i, BigQueryStorageTableSource.create(ValueProvider.StaticValueProvider.of(parseTableSpec), (ReadOptions.TableReadOptions) null, new BigQueryIO.TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(this.fakeDatasetService).withStorageClient(r0)).split(j, this.options).size());
    }

    @Test
    public void testTableSourceInitialSplit_WithTableReadOptions() throws Throwable {
        this.fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
        TableReference parseTableSpec = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
        this.fakeDatasetService.createTable(new Table().setTableReference(parseTableSpec).setNumBytes(100L).setSchema(new TableSchema().setFields(ImmutableList.of(new TableFieldSchema().setName("name").setType("STRING"), new TableFieldSchema().setName("number").setType("INTEGER")))));
        Storage.CreateReadSessionRequest build = Storage.CreateReadSessionRequest.newBuilder().setParent("projects/project-id").setTableReference(BigQueryHelpers.toTableRefProto(parseTableSpec)).setRequestedStreams(10).setReadOptions(ReadOptions.TableReadOptions.newBuilder().addSelectedFields("name").addSelectedFields("number").setRowRestriction("number > 5").build()).build();
        Storage.ReadSession.Builder newBuilder = Storage.ReadSession.newBuilder();
        for (int i = 0; i < 10; i++) {
            newBuilder.addStreams(Storage.Stream.newBuilder().setName("stream-" + i));
        }
        Mockito.when(((BigQueryServices.StorageClient) Mockito.mock(BigQueryServices.StorageClient.class)).createReadSession(build)).thenReturn(newBuilder.build());
        Assert.assertEquals(10L, BigQueryStorageTableSource.create(ValueProvider.StaticValueProvider.of(parseTableSpec), r0, new BigQueryIO.TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(this.fakeDatasetService).withStorageClient(r0)).split(10L, this.options).size());
    }

    @Test
    public void testTableSourceInitialSplit_WithDefaultProject() throws Exception {
        this.fakeDatasetService.createDataset("project-id", "dataset", "", "", null);
        TableReference parseTableSpec = BigQueryHelpers.parseTableSpec("project-id:dataset.table");
        this.fakeDatasetService.createTable(new Table().setTableReference(parseTableSpec).setNumBytes(1048576L).setSchema(new TableSchema()));
        Storage.CreateReadSessionRequest build = Storage.CreateReadSessionRequest.newBuilder().setParent("projects/project-id").setTableReference(BigQueryHelpers.toTableRefProto(parseTableSpec)).setRequestedStreams(1024).build();
        Storage.ReadSession.Builder newBuilder = Storage.ReadSession.newBuilder();
        for (int i = 0; i < 50; i++) {
            newBuilder.addStreams(Storage.Stream.newBuilder().setName("stream-" + i));
        }
        Mockito.when(((BigQueryServices.StorageClient) Mockito.mock(BigQueryServices.StorageClient.class)).createReadSession(build)).thenReturn(newBuilder.build());
        Assert.assertEquals(50L, BigQueryStorageTableSource.create(ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")), (ReadOptions.TableReadOptions) null, new BigQueryIO.TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(this.fakeDatasetService).withStorageClient(r0)).split(1024L, this.options).size());
    }

    @Test
    public void testTableSourceInitialSplit_EmptyTable() throws Exception {
        this.fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
        TableReference parseTableSpec = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
        this.fakeDatasetService.createTable(new Table().setTableReference(parseTableSpec).setNumBytes(1048576L).setSchema(new TableSchema()));
        Storage.CreateReadSessionRequest build = Storage.CreateReadSessionRequest.newBuilder().setParent("projects/project-id").setTableReference(BigQueryHelpers.toTableRefProto(parseTableSpec)).setRequestedStreams(1024).build();
        Storage.ReadSession build2 = Storage.ReadSession.newBuilder().build();
        BigQueryServices.StorageClient storageClient = (BigQueryServices.StorageClient) Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when(storageClient.createReadSession(build)).thenReturn(build2);
        Assert.assertTrue(BigQueryStorageTableSource.create(ValueProvider.StaticValueProvider.of(parseTableSpec), (ReadOptions.TableReadOptions) null, new BigQueryIO.TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(this.fakeDatasetService).withStorageClient(storageClient)).split(1024L, this.options).isEmpty());
    }

    @Test
    public void testTableSourceCreateReader() throws Exception {
        BigQueryStorageTableSource create = BigQueryStorageTableSource.create(ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table")), (ReadOptions.TableReadOptions) null, new BigQueryIO.TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(this.fakeDatasetService));
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage("BigQuery storage source must be split before reading");
        create.createReader(this.options);
    }

    private static GenericRecord createRecord(String str, long j, Schema schema) {
        GenericData.Record record = new GenericData.Record(schema);
        record.put("name", str);
        record.put("number", Long.valueOf(j));
        return record;
    }

    private static Storage.ReadRowsResponse createResponse(Schema schema, Collection<GenericRecord> collection) throws Exception {
        GenericDatumWriter genericDatumWriter = new GenericDatumWriter(schema);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder = ENCODER_FACTORY.binaryEncoder(byteArrayOutputStream, (BinaryEncoder) null);
        Iterator<GenericRecord> it = collection.iterator();
        while (it.hasNext()) {
            genericDatumWriter.write(it.next(), binaryEncoder);
        }
        binaryEncoder.flush();
        return Storage.ReadRowsResponse.newBuilder().setAvroRows(AvroProto.AvroRows.newBuilder().setSerializedBinaryRows(ByteString.copyFrom(byteArrayOutputStream.toByteArray())).setRowCount(collection.size())).build();
    }

    @Test
    public void testStreamSourceEstimatedSizeBytes() throws Exception {
        Assert.assertEquals(0L, BigQueryStorageStreamSource.create(Storage.ReadSession.getDefaultInstance(), Storage.Stream.getDefaultInstance(), TABLE_SCHEMA, new BigQueryIO.TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices()).getEstimatedSizeBytes(this.options));
    }

    @Test
    public void testStreamSourceSplit() throws Exception {
        OffsetBasedSource create = BigQueryStorageStreamSource.create(Storage.ReadSession.getDefaultInstance(), Storage.Stream.getDefaultInstance(), TABLE_SCHEMA, new BigQueryIO.TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices());
        MatcherAssert.assertThat(create.split(0L, this.options), Matchers.containsInAnyOrder(new OffsetBasedSource[]{create}));
    }

    @Test
    public void testStreamSourceGetMaxEndOffset() throws Exception {
        BigQueryStorageStreamSource create = BigQueryStorageStreamSource.create(Storage.ReadSession.getDefaultInstance(), Storage.Stream.getDefaultInstance(), TABLE_SCHEMA, new BigQueryIO.TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices());
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage("Not implemented");
        create.getMaxEndOffset(this.options);
    }

    @Test
    public void testStreamSourceCreateSouceForSubrange() throws Exception {
        BigQueryStorageStreamSource create = BigQueryStorageStreamSource.create(Storage.ReadSession.getDefaultInstance(), Storage.Stream.getDefaultInstance(), TABLE_SCHEMA, new BigQueryIO.TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices());
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage("Not implemented");
        create.createSourceForSubrange(0L, 0L);
    }

    @Test
    public void testReadFromStreamSource() throws Exception {
        Storage.ReadSession build = Storage.ReadSession.newBuilder().setName("readSession").setAvroSchema(AvroProto.AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).build();
        Storage.Stream build2 = Storage.Stream.newBuilder().setName("stream").build();
        Storage.ReadRowsRequest build3 = Storage.ReadRowsRequest.newBuilder().setReadPosition(Storage.StreamPosition.newBuilder().setStream(build2)).build();
        ArrayList newArrayList = Lists.newArrayList(new GenericRecord[]{createRecord("A", 1L, AVRO_SCHEMA), createRecord("B", 2L, AVRO_SCHEMA), createRecord("C", 3L, AVRO_SCHEMA)});
        ArrayList newArrayList2 = Lists.newArrayList(new Storage.ReadRowsResponse[]{createResponse(AVRO_SCHEMA, newArrayList.subList(0, 2)), createResponse(AVRO_SCHEMA, newArrayList.subList(2, 3))});
        BigQueryServices.StorageClient storageClient = (BigQueryServices.StorageClient) Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when(storageClient.readRows(build3)).thenReturn(newArrayList2);
        BigQueryStorageStreamSource create = BigQueryStorageStreamSource.create(build, build2, TABLE_SCHEMA, new BigQueryIO.TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(storageClient));
        ArrayList arrayList = new ArrayList();
        BigQueryStorageStreamSource.BigQueryStorageStreamReader createReader = create.createReader(this.options);
        for (boolean start = createReader.start(); start; start = createReader.advance()) {
            arrayList.add((TableRow) createReader.getCurrent());
        }
        System.out.println("Rows: " + arrayList);
        Assert.assertEquals(3L, arrayList.size());
    }

    @Test
    public void testStreamSourceSplitAtFraction() throws Exception {
        Storage.ReadSession build = Storage.ReadSession.newBuilder().setName("readSession").setAvroSchema(AvroProto.AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).build();
        Storage.Stream build2 = Storage.Stream.newBuilder().setName("stream").build();
        Storage.ReadRowsRequest build3 = Storage.ReadRowsRequest.newBuilder().setReadPosition(Storage.StreamPosition.newBuilder().setStream(build2)).build();
        ArrayList newArrayList = Lists.newArrayList(new GenericRecord[]{createRecord("A", 1L, AVRO_SCHEMA), createRecord("B", 2L, AVRO_SCHEMA), createRecord("C", 3L, AVRO_SCHEMA)});
        ArrayList newArrayList2 = Lists.newArrayList(new Storage.ReadRowsResponse[]{createResponse(AVRO_SCHEMA, newArrayList.subList(0, 2)), createResponse(AVRO_SCHEMA, newArrayList.subList(2, 3))});
        BigQueryServices.StorageClient storageClient = (BigQueryServices.StorageClient) Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when(storageClient.readRows(build3)).thenReturn(newArrayList2);
        BigQueryStorageStreamSource.BigQueryStorageStreamReader createReader = BigQueryStorageStreamSource.create(build, build2, TABLE_SCHEMA, new BigQueryIO.TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(storageClient)).createReader(this.options);
        createReader.start();
        Assert.assertNull(createReader.splitAtFraction(0.5d));
    }

    @Test
    public void testReadFromBigQueryIO() throws Exception {
        this.fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
        TableReference parseTableSpec = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
        this.fakeDatasetService.createTable(new Table().setTableReference(parseTableSpec).setNumBytes(10L).setSchema(new TableSchema()));
        Storage.CreateReadSessionRequest build = Storage.CreateReadSessionRequest.newBuilder().setParent("projects/project-id").setTableReference(BigQueryHelpers.toTableRefProto(parseTableSpec)).setRequestedStreams(10).build();
        Storage.ReadSession build2 = Storage.ReadSession.newBuilder().setName("readSessionName").setAvroSchema(AvroProto.AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).addStreams(Storage.Stream.newBuilder().setName("streamName")).build();
        Storage.ReadRowsRequest build3 = Storage.ReadRowsRequest.newBuilder().setReadPosition(Storage.StreamPosition.newBuilder().setStream(Storage.Stream.newBuilder().setName("streamName"))).build();
        ArrayList newArrayList = Lists.newArrayList(new GenericRecord[]{createRecord("A", 1L, AVRO_SCHEMA), createRecord("B", 2L, AVRO_SCHEMA), createRecord("C", 3L, AVRO_SCHEMA), createRecord("D", 4L, AVRO_SCHEMA)});
        ArrayList newArrayList2 = Lists.newArrayList(new Storage.ReadRowsResponse[]{createResponse(AVRO_SCHEMA, newArrayList.subList(0, 2)), createResponse(AVRO_SCHEMA, newArrayList.subList(2, 4))});
        BigQueryServices.StorageClient storageClient = (BigQueryServices.StorageClient) Mockito.mock(BigQueryServices.StorageClient.class, Mockito.withSettings().serializable());
        Mockito.when(storageClient.createReadSession(build)).thenReturn(build2);
        Mockito.when(storageClient.readRows(build3)).thenReturn(newArrayList2);
        PAssert.that(this.p.apply(BigQueryIO.read(new ParseKeyValue()).from("foo.com:project:dataset.table").withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).withTestServices(new FakeBigQueryServices().withDatasetService(this.fakeDatasetService).withStorageClient(storageClient)))).containsInAnyOrder(ImmutableList.of(KV.of("A", 1L), KV.of("B", 2L), KV.of("C", 3L), KV.of("D", 4L)));
        this.p.run();
    }
}
