/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.FailedPreconditionException;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.services.bigquery.model.Streamingbuffer;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1beta1.AvroProto;
import com.google.cloud.bigquery.storage.v1beta1.ReadOptions;
import com.google.cloud.bigquery.storage.v1beta1.Storage;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnknownFieldSet;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.protobuf.ByteStringCoder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageStreamSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageTableSource;
import org.apache.beam.sdk.io.gcp.bigquery.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.model.Statement;
import org.mockito.MockSettings;
import org.mockito.Mockito;

@RunWith(value=JUnit4.class)
public class BigQueryIOStorageReadTest {
    private transient PipelineOptions options;
    private transient TemporaryFolder testFolder = new TemporaryFolder();
    private transient TestPipeline p;
    @Rule
    public final transient TestRule folderThenPipeline = new TestRule(){

        public Statement apply(final Statement base, final Description description) {
            Statement withPipeline = new Statement(){

                public void evaluate() throws Throwable {
                    BigQueryIOStorageReadTest.this.options = TestPipeline.testingPipelineOptions();
                    ((BigQueryOptions)BigQueryIOStorageReadTest.this.options.as(BigQueryOptions.class)).setProject("project-id");
                    ((BigQueryOptions)BigQueryIOStorageReadTest.this.options.as(BigQueryOptions.class)).setTempLocation(BigQueryIOStorageReadTest.this.testFolder.getRoot().getAbsolutePath());
                    BigQueryIOStorageReadTest.this.p = TestPipeline.fromOptions((PipelineOptions)BigQueryIOStorageReadTest.this.options);
                    BigQueryIOStorageReadTest.this.p.apply(base, description).evaluate();
                }
            };
            return BigQueryIOStorageReadTest.this.testFolder.apply(withPipeline, description);
        }
    };
    @Rule
    public transient ExpectedException thrown = ExpectedException.none();
    private FakeDatasetService fakeDatasetService = new FakeDatasetService();
    private static final String AVRO_SCHEMA_STRING = "{\"namespace\": \"example.avro\",\n \"type\": \"record\",\n \"name\": \"RowRecord\",\n \"fields\": [\n     {\"name\": \"name\", \"type\": \"string\"},\n     {\"name\": \"number\", \"type\": \"long\"}\n ]\n}";
    private static final Schema AVRO_SCHEMA = new Schema.Parser().parse("{\"namespace\": \"example.avro\",\n \"type\": \"record\",\n \"name\": \"RowRecord\",\n \"fields\": [\n     {\"name\": \"name\", \"type\": \"string\"},\n     {\"name\": \"number\", \"type\": \"long\"}\n ]\n}");
    private static final TableSchema TABLE_SCHEMA = new TableSchema().setFields((List)ImmutableList.of((Object)new TableFieldSchema().setName("name").setType("STRING").setMode("REQUIRED"), (Object)new TableFieldSchema().setName("number").setType("INTEGER").setMode("REQUIRED")));
    private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get();

    @Before
    public void setUp() throws Exception {
        FakeDatasetService.setUp();
    }

    @Test
    public void testBuildTableBasedSource() {
        BigQueryIO.TypedRead typedRead = BigQueryIO.read((SerializableFunction)new BigQueryIO.TableRowParser()).withCoder((Coder)TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table");
        this.checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table");
        Assert.assertTrue((boolean)typedRead.getValidate());
    }

    @Test
    public void testBuildTableBasedSourceWithoutValidation() {
        BigQueryIO.TypedRead typedRead = BigQueryIO.read((SerializableFunction)new BigQueryIO.TableRowParser()).withCoder((Coder)TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table").withoutValidation();
        this.checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table");
        Assert.assertFalse((boolean)typedRead.getValidate());
    }

    @Test
    public void testBuildTableBasedSourceWithDefaultProject() {
        BigQueryIO.TypedRead typedRead = BigQueryIO.read((SerializableFunction)new BigQueryIO.TableRowParser()).withCoder((Coder)TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("myDataset.myTable");
        this.checkTypedReadTableObject(typedRead, null, "myDataset", "myTable");
    }

    @Test
    public void testBuildTableBasedSourceWithReadOptions() {
        ReadOptions.TableReadOptions readOptions = ReadOptions.TableReadOptions.newBuilder().addSelectedFields("field1").addSelectedFields("field2").setRowRestriction("int_field > 5").build();
        BigQueryIO.TypedRead typedRead = BigQueryIO.read((SerializableFunction)new BigQueryIO.TableRowParser()).withCoder((Coder)TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table").withReadOptions(readOptions);
        this.checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table");
        Assert.assertEquals((Object)typedRead.getReadOptions(), (Object)readOptions);
    }

    @Test
    public void testBuildTableBasedSourceWithTableReference() {
        TableReference tableReference = new TableReference().setProjectId("foo.com:project").setDatasetId("dataset").setTableId("table");
        BigQueryIO.TypedRead typedRead = BigQueryIO.read((SerializableFunction)new BigQueryIO.TableRowParser()).withCoder((Coder)TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from(tableReference);
        this.checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table");
    }

    private void checkTypedReadTableObject(BigQueryIO.TypedRead typedRead, String project, String dataset, String table) {
        Assert.assertEquals((Object)project, (Object)typedRead.getTable().getProjectId());
        Assert.assertEquals((Object)dataset, (Object)typedRead.getTable().getDatasetId());
        Assert.assertEquals((Object)table, (Object)typedRead.getTable().getTableId());
        Assert.assertNull((Object)typedRead.getQuery());
        Assert.assertEquals((Object)BigQueryIO.TypedRead.Method.DIRECT_READ, (Object)typedRead.getMethod());
    }

    @Test
    public void testBuildSourceWithTableAndFlatten() {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Invalid BigQueryIO.Read: Specifies a table with a result flattening preference, which only applies to queries");
        this.p.apply("ReadMyTable", (PTransform)BigQueryIO.read((SerializableFunction)new BigQueryIO.TableRowParser()).withCoder((Coder)TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table").withoutResultFlattening());
        this.p.run();
    }

    @Test
    public void testBuildSourceWithTableAndSqlDialect() {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Invalid BigQueryIO.Read: Specifies a table with a SQL dialect preference, which only applies to queries");
        this.p.apply("ReadMyTable", (PTransform)BigQueryIO.read((SerializableFunction)new BigQueryIO.TableRowParser()).withCoder((Coder)TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table").usingStandardSql());
        this.p.run();
    }

    @Test
    public void testDisplayData() {
        String tableSpec = "foo.com:project:dataset.table";
        BigQueryIO.TypedRead typedRead = BigQueryIO.read((SerializableFunction)new BigQueryIO.TableRowParser()).withCoder((Coder)TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from(tableSpec);
        DisplayData displayData = DisplayData.from((HasDisplayData)typedRead);
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"table", (String)tableSpec));
    }

    @Test
    public void testEvaluatedDisplayData() {
        DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
        BigQueryIO.TypedRead typedRead = BigQueryIO.read((SerializableFunction)new BigQueryIO.TableRowParser()).withCoder((Coder)TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table");
        Set displayData = evaluator.displayDataForPrimitiveSourceTransforms((PTransform)typedRead);
        MatcherAssert.assertThat((Object)displayData, (Matcher)Matchers.hasItem((Matcher)DisplayDataMatchers.hasDisplayItem((String)"table")));
    }

    @Test
    public void testName() {
        Assert.assertEquals((Object)"BigQueryIO.TypedRead", (Object)BigQueryIO.read((SerializableFunction)new BigQueryIO.TableRowParser()).withCoder((Coder)TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table").getName());
    }

    @Test
    public void testCoderInference() {
        SerializableFunction<SchemaAndRecord, KV<ByteString, Storage.ReadSession>> parseFn = new SerializableFunction<SchemaAndRecord, KV<ByteString, Storage.ReadSession>>(){

            public KV<ByteString, Storage.ReadSession> apply(SchemaAndRecord input) {
                return null;
            }
        };
        Assert.assertEquals((Object)KvCoder.of((Coder)ByteStringCoder.of(), (Coder)ProtoCoder.of(Storage.ReadSession.class)), (Object)BigQueryIO.read((SerializableFunction)parseFn).inferCoder(CoderRegistry.createDefault()));
    }

    @Test
    public void testTableSourceEstimatedSize() throws Exception {
        this.doTableSourceEstimatedSizeTest(false);
    }

    @Test
    public void testTableSourceEstimatedSize_IgnoresStreamingBuffer() throws Exception {
        this.doTableSourceEstimatedSizeTest(true);
    }

    private void doTableSourceEstimatedSizeTest(boolean useStreamingBuffer) throws Exception {
        this.fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
        TableReference tableRef = BigQueryHelpers.parseTableSpec((String)"foo.com:project:dataset.table");
        Table table = new Table().setTableReference(tableRef).setNumBytes(Long.valueOf(100L));
        if (useStreamingBuffer) {
            table.setStreamingBuffer(new Streamingbuffer().setEstimatedBytes(BigInteger.TEN));
        }
        this.fakeDatasetService.createTable(table);
        BigQueryStorageTableSource tableSource = BigQueryStorageTableSource.create((ValueProvider)ValueProvider.StaticValueProvider.of((Object)tableRef), null, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService));
        Assert.assertEquals((long)100L, (long)tableSource.getEstimatedSizeBytes(this.options));
    }

    @Test
    public void testTableSourceEstimatedSize_WithDefaultProject() throws Exception {
        this.fakeDatasetService.createDataset("project-id", "dataset", "", "", null);
        TableReference tableRef = BigQueryHelpers.parseTableSpec((String)"project-id:dataset.table");
        Table table = new Table().setTableReference(tableRef).setNumBytes(Long.valueOf(100L));
        this.fakeDatasetService.createTable(table);
        BigQueryStorageTableSource tableSource = BigQueryStorageTableSource.create((ValueProvider)ValueProvider.StaticValueProvider.of((Object)BigQueryHelpers.parseTableSpec((String)"dataset.table")), null, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService));
        Assert.assertEquals((long)100L, (long)tableSource.getEstimatedSizeBytes(this.options));
    }

    @Test
    public void testTableSourceInitialSplit() throws Exception {
        this.doTableSourceInitialSplitTest(1024L, 1024);
    }

    @Test
    public void testTableSourceInitialSplit_MinSplitCount() throws Exception {
        this.doTableSourceInitialSplitTest(0x100000L, 10);
    }

    @Test
    public void testTableSourceInitialSplit_MaxSplitCount() throws Exception {
        this.doTableSourceInitialSplitTest(10L, 10000);
    }

    private void doTableSourceInitialSplitTest(long bundleSize, int streamCount) throws Exception {
        this.fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
        TableReference tableRef = BigQueryHelpers.parseTableSpec((String)"foo.com:project:dataset.table");
        Table table = new Table().setTableReference(tableRef).setNumBytes(Long.valueOf(0x100000L)).setSchema(new TableSchema());
        this.fakeDatasetService.createTable(table);
        Storage.CreateReadSessionRequest expectedRequest = Storage.CreateReadSessionRequest.newBuilder().setParent("projects/project-id").setTableReference(BigQueryHelpers.toTableRefProto((TableReference)tableRef)).setRequestedStreams(streamCount).setUnknownFields(UnknownFieldSet.newBuilder().addField(7, UnknownFieldSet.Field.newBuilder().addVarint(2L).build()).build()).build();
        Storage.ReadSession.Builder builder = Storage.ReadSession.newBuilder();
        for (int i = 0; i < streamCount; ++i) {
            builder.addStreams(Storage.Stream.newBuilder().setName("stream-" + i));
        }
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.createReadSession(expectedRequest)).thenReturn((Object)builder.build());
        BigQueryStorageTableSource tableSource = BigQueryStorageTableSource.create((ValueProvider)ValueProvider.StaticValueProvider.of((Object)tableRef), null, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService).withStorageClient(fakeStorageClient));
        List sources = tableSource.split(bundleSize, this.options);
        Assert.assertEquals((long)streamCount, (long)sources.size());
    }

    @Test
    public void testTableSourceInitialSplit_WithTableReadOptions() throws Throwable {
        this.fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
        TableReference tableRef = BigQueryHelpers.parseTableSpec((String)"foo.com:project:dataset.table");
        Table table = new Table().setTableReference(tableRef).setNumBytes(Long.valueOf(100L)).setSchema(new TableSchema().setFields((List)ImmutableList.of((Object)new TableFieldSchema().setName("name").setType("STRING"), (Object)new TableFieldSchema().setName("number").setType("INTEGER"))));
        this.fakeDatasetService.createTable(table);
        ReadOptions.TableReadOptions readOptions = ReadOptions.TableReadOptions.newBuilder().addSelectedFields("name").addSelectedFields("number").setRowRestriction("number > 5").build();
        Storage.CreateReadSessionRequest expectedRequest = Storage.CreateReadSessionRequest.newBuilder().setParent("projects/project-id").setTableReference(BigQueryHelpers.toTableRefProto((TableReference)tableRef)).setRequestedStreams(10).setReadOptions(readOptions).setUnknownFields(UnknownFieldSet.newBuilder().addField(7, UnknownFieldSet.Field.newBuilder().addVarint(2L).build()).build()).build();
        Storage.ReadSession.Builder builder = Storage.ReadSession.newBuilder();
        for (int i = 0; i < 10; ++i) {
            builder.addStreams(Storage.Stream.newBuilder().setName("stream-" + i));
        }
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.createReadSession(expectedRequest)).thenReturn((Object)builder.build());
        BigQueryStorageTableSource tableSource = BigQueryStorageTableSource.create((ValueProvider)ValueProvider.StaticValueProvider.of((Object)tableRef), (ReadOptions.TableReadOptions)readOptions, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService).withStorageClient(fakeStorageClient));
        List sources = tableSource.split(10L, this.options);
        Assert.assertEquals((long)10L, (long)sources.size());
    }

    @Test
    public void testTableSourceInitialSplit_WithDefaultProject() throws Exception {
        this.fakeDatasetService.createDataset("project-id", "dataset", "", "", null);
        TableReference tableRef = BigQueryHelpers.parseTableSpec((String)"project-id:dataset.table");
        Table table = new Table().setTableReference(tableRef).setNumBytes(Long.valueOf(0x100000L)).setSchema(new TableSchema());
        this.fakeDatasetService.createTable(table);
        Storage.CreateReadSessionRequest expectedRequest = Storage.CreateReadSessionRequest.newBuilder().setParent("projects/project-id").setTableReference(BigQueryHelpers.toTableRefProto((TableReference)tableRef)).setRequestedStreams(1024).setUnknownFields(UnknownFieldSet.newBuilder().addField(7, UnknownFieldSet.Field.newBuilder().addVarint(2L).build()).build()).build();
        Storage.ReadSession.Builder builder = Storage.ReadSession.newBuilder();
        for (int i = 0; i < 50; ++i) {
            builder.addStreams(Storage.Stream.newBuilder().setName("stream-" + i));
        }
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.createReadSession(expectedRequest)).thenReturn((Object)builder.build());
        BigQueryStorageTableSource tableSource = BigQueryStorageTableSource.create((ValueProvider)ValueProvider.StaticValueProvider.of((Object)BigQueryHelpers.parseTableSpec((String)"dataset.table")), null, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService).withStorageClient(fakeStorageClient));
        List sources = tableSource.split(1024L, this.options);
        Assert.assertEquals((long)50L, (long)sources.size());
    }

    @Test
    public void testTableSourceInitialSplit_EmptyTable() throws Exception {
        this.fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
        TableReference tableRef = BigQueryHelpers.parseTableSpec((String)"foo.com:project:dataset.table");
        Table table = new Table().setTableReference(tableRef).setNumBytes(Long.valueOf(0x100000L)).setSchema(new TableSchema());
        this.fakeDatasetService.createTable(table);
        Storage.CreateReadSessionRequest expectedRequest = Storage.CreateReadSessionRequest.newBuilder().setParent("projects/project-id").setTableReference(BigQueryHelpers.toTableRefProto((TableReference)tableRef)).setRequestedStreams(1024).setUnknownFields(UnknownFieldSet.newBuilder().addField(7, UnknownFieldSet.Field.newBuilder().addVarint(2L).build()).build()).build();
        Storage.ReadSession emptyReadSession = Storage.ReadSession.newBuilder().build();
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.createReadSession(expectedRequest)).thenReturn((Object)emptyReadSession);
        BigQueryStorageTableSource tableSource = BigQueryStorageTableSource.create((ValueProvider)ValueProvider.StaticValueProvider.of((Object)tableRef), null, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService).withStorageClient(fakeStorageClient));
        List sources = tableSource.split(1024L, this.options);
        Assert.assertTrue((boolean)sources.isEmpty());
    }

    @Test
    public void testTableSourceCreateReader() throws Exception {
        BigQueryStorageTableSource tableSource = BigQueryStorageTableSource.create((ValueProvider)ValueProvider.StaticValueProvider.of((Object)BigQueryHelpers.parseTableSpec((String)"foo.com:project:dataset.table")), null, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService));
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage("BigQuery storage source must be split before reading");
        tableSource.createReader(this.options);
    }

    private static GenericRecord createRecord(String name, long number, Schema schema) {
        GenericData.Record genericRecord = new GenericData.Record(schema);
        genericRecord.put("name", (Object)name);
        genericRecord.put("number", (Object)number);
        return genericRecord;
    }

    private static Storage.ReadRowsResponse createResponse(Schema schema, Collection<GenericRecord> genericRecords, double fractionConsumed) throws Exception {
        GenericDatumWriter writer = new GenericDatumWriter(schema);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder = ENCODER_FACTORY.binaryEncoder((OutputStream)outputStream, null);
        for (GenericRecord genericRecord : genericRecords) {
            writer.write((Object)genericRecord, (Encoder)binaryEncoder);
        }
        binaryEncoder.flush();
        return Storage.ReadRowsResponse.newBuilder().setAvroRows(AvroProto.AvroRows.newBuilder().setSerializedBinaryRows(ByteString.copyFrom((byte[])outputStream.toByteArray())).setRowCount((long)genericRecords.size())).setStatus(Storage.StreamStatus.newBuilder().setUnknownFields(UnknownFieldSet.newBuilder().addField(2, UnknownFieldSet.Field.newBuilder().addFixed32(Float.floatToIntBits((float)fractionConsumed)).build()).build())).build();
    }

    private static Storage.ReadRowsResponse createResponse(Schema schema, Collection<GenericRecord> genericRecords) throws Exception {
        return BigQueryIOStorageReadTest.createResponse(schema, genericRecords, 0.0);
    }

    @Test
    public void testStreamSourceEstimatedSizeBytes() throws Exception {
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((Storage.ReadSession)Storage.ReadSession.getDefaultInstance(), (Storage.Stream)Storage.Stream.getDefaultInstance(), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices());
        Assert.assertEquals((long)0L, (long)streamSource.getEstimatedSizeBytes(this.options));
    }

    @Test
    public void testStreamSourceSplit() throws Exception {
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((Storage.ReadSession)Storage.ReadSession.getDefaultInstance(), (Storage.Stream)Storage.Stream.getDefaultInstance(), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices());
        MatcherAssert.assertThat((Object)streamSource.split(0L, this.options), (Matcher)Matchers.containsInAnyOrder((Object[])new BoundedSource[]{streamSource}));
    }

    @Test
    public void testReadFromStreamSource() throws Exception {
        Storage.ReadSession readSession = Storage.ReadSession.newBuilder().setName("readSession").setAvroSchema(AvroProto.AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).build();
        Storage.Stream stream = Storage.Stream.newBuilder().setName("stream").build();
        Storage.ReadRowsRequest expectedRequest = Storage.ReadRowsRequest.newBuilder().setReadPosition(Storage.StreamPosition.newBuilder().setStream(stream)).build();
        ArrayList records = Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("A", 1L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("B", 2L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA)});
        ArrayList responses = Lists.newArrayList((Object[])new Storage.ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(0, 2)), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(2, 3))});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(expectedRequest)).thenReturn(new FakeBigQueryServices.FakeBigQueryServerStream(responses));
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((Storage.ReadSession)readSession, (Storage.Stream)stream, (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        ArrayList<TableRow> rows = new ArrayList<TableRow>();
        BigQueryStorageStreamSource.BigQueryStorageStreamReader reader = streamSource.createReader(this.options);
        boolean hasNext = reader.start();
        while (hasNext) {
            rows.add((TableRow)reader.getCurrent());
            hasNext = reader.advance();
        }
        System.out.println("Rows: " + rows);
        Assert.assertEquals((long)3L, (long)rows.size());
    }

    @Test
    public void testFractionConsumed() throws Exception {
        Storage.ReadSession readSession = Storage.ReadSession.newBuilder().setName("readSession").setAvroSchema(AvroProto.AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).build();
        Storage.Stream stream = Storage.Stream.newBuilder().setName("stream").build();
        Storage.ReadRowsRequest expectedRequest = Storage.ReadRowsRequest.newBuilder().setReadPosition(Storage.StreamPosition.newBuilder().setStream(stream)).build();
        ArrayList records = Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("A", 1L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("B", 2L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("D", 4L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("E", 5L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("F", 6L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("G", 7L, AVRO_SCHEMA)});
        ArrayList responses = Lists.newArrayList((Object[])new Storage.ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.25), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(2, 4), 0.5), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(4, 7), 0.9375)});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(expectedRequest)).thenReturn(new FakeBigQueryServices.FakeBigQueryServerStream(responses));
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((Storage.ReadSession)readSession, (Storage.Stream)stream, (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        ArrayList rows = new ArrayList();
        BigQueryStorageStreamSource.BigQueryStorageStreamReader reader = streamSource.createReader(this.options);
        Assert.assertEquals((Object)0.0, (Object)reader.getFractionConsumed());
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)0.0, (Object)reader.getFractionConsumed());
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)0.0, (Object)reader.getFractionConsumed());
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)0.25, (Object)reader.getFractionConsumed());
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)0.25, (Object)reader.getFractionConsumed());
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)0.5, (Object)reader.getFractionConsumed());
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)0.5, (Object)reader.getFractionConsumed());
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)0.5, (Object)reader.getFractionConsumed());
        Assert.assertFalse((boolean)reader.advance());
        Assert.assertEquals((Object)1.0, (Object)reader.getFractionConsumed());
    }

    @Test
    public void testStreamSourceSplitAtFractionSucceeds() throws Exception {
        Storage.Stream parentStream = Storage.Stream.newBuilder().setName("parent").build();
        ArrayList parentResponses = Lists.newArrayList((Object[])new Storage.ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("A", 1L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("B", 2L, AVRO_SCHEMA)})), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA)})), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("D", 4L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("E", 5L, AVRO_SCHEMA)}))});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(Storage.ReadRowsRequest.newBuilder().setReadPosition(Storage.StreamPosition.newBuilder().setStream(parentStream)).build())).thenReturn(new FakeBigQueryServices.FakeBigQueryServerStream(parentResponses));
        Mockito.when((Object)fakeStorageClient.splitReadStream(Storage.SplitReadStreamRequest.newBuilder().setOriginalStream(parentStream).setUnknownFields(UnknownFieldSet.newBuilder().addField(2, UnknownFieldSet.Field.newBuilder().addFixed32(Float.floatToIntBits(0.5f)).build()).build()).build())).thenReturn((Object)Storage.SplitReadStreamResponse.newBuilder().setPrimaryStream(Storage.Stream.newBuilder().setName("primary")).setRemainderStream(Storage.Stream.newBuilder().setName("residual")).build());
        Mockito.when((Object)fakeStorageClient.readRows(Storage.ReadRowsRequest.newBuilder().setReadPosition(Storage.StreamPosition.newBuilder().setStream(Storage.Stream.newBuilder().setName("primary")).setOffset(2L)).build())).thenReturn(new FakeBigQueryServices.FakeBigQueryServerStream(parentResponses.subList(1, 2)));
        Mockito.when((Object)fakeStorageClient.readRows(Storage.ReadRowsRequest.newBuilder().setReadPosition(Storage.StreamPosition.newBuilder().setStream(Storage.Stream.newBuilder().setName("residual")).setOffset(0L)).build())).thenReturn(new FakeBigQueryServices.FakeBigQueryServerStream(parentResponses.subList(2, parentResponses.size())));
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((Storage.ReadSession)Storage.ReadSession.newBuilder().setName("readSession").setAvroSchema(AvroProto.AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).build(), (Storage.Stream)parentStream, (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        BigQueryStorageStreamSource.BigQueryStorageStreamReader parent = streamSource.createReader(this.options);
        Assert.assertTrue((boolean)parent.start());
        Assert.assertEquals((Object)"A", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"B", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        BigQueryStorageStreamSource.BigQueryStorageStreamReader primary = parent;
        BoundedSource residualSource = parent.splitAtFraction(0.5);
        Assert.assertNotNull((Object)residualSource);
        BoundedSource.BoundedReader residual = residualSource.createReader(this.options);
        Assert.assertTrue((boolean)primary.advance());
        Assert.assertEquals((Object)"C", (Object)((TableRow)primary.getCurrent()).get((Object)"name"));
        Assert.assertFalse((boolean)primary.advance());
        Assert.assertTrue((boolean)residual.start());
        Assert.assertEquals((Object)"D", (Object)((TableRow)residual.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)residual.advance());
        Assert.assertEquals((Object)"E", (Object)((TableRow)residual.getCurrent()).get((Object)"name"));
        Assert.assertFalse((boolean)residual.advance());
    }

    @Test
    public void testStreamSourceSplitAtFractionRepeated() throws Exception {
        ArrayList streams = Lists.newArrayList((Object[])new Storage.Stream[]{Storage.Stream.newBuilder().setName("stream1").build(), Storage.Stream.newBuilder().setName("stream2").build(), Storage.Stream.newBuilder().setName("stream3").build()});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(Storage.ReadRowsRequest.newBuilder().setReadPosition(Storage.StreamPosition.newBuilder().setStream((Storage.Stream)streams.get(0))).build())).thenReturn(new FakeBigQueryServices.FakeBigQueryServerStream(Lists.newArrayList((Object[])new Storage.ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("A", 1L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("B", 2L, AVRO_SCHEMA)})), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("D", 4L, AVRO_SCHEMA)})), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("E", 5L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("F", 6L, AVRO_SCHEMA)}))})));
        Mockito.when((Object)fakeStorageClient.splitReadStream(Storage.SplitReadStreamRequest.newBuilder().setOriginalStream((Storage.Stream)streams.get(0)).setUnknownFields(UnknownFieldSet.newBuilder().addField(2, UnknownFieldSet.Field.newBuilder().addFixed32(Float.floatToIntBits(0.83f)).build()).build()).build())).thenReturn((Object)Storage.SplitReadStreamResponse.newBuilder().setPrimaryStream((Storage.Stream)streams.get(1)).setRemainderStream(Storage.Stream.newBuilder().setName("ignored")).build());
        Mockito.when((Object)fakeStorageClient.readRows(Storage.ReadRowsRequest.newBuilder().setReadPosition(Storage.StreamPosition.newBuilder().setStream((Storage.Stream)streams.get(1)).setOffset(1L)).build())).thenReturn(new FakeBigQueryServices.FakeBigQueryServerStream(Lists.newArrayList((Object[])new Storage.ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("B", 2L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA)})), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("D", 4L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("E", 5L, AVRO_SCHEMA)}))})));
        Mockito.when((Object)fakeStorageClient.splitReadStream(Storage.SplitReadStreamRequest.newBuilder().setOriginalStream((Storage.Stream)streams.get(1)).setUnknownFields(UnknownFieldSet.newBuilder().addField(2, UnknownFieldSet.Field.newBuilder().addFixed32(Float.floatToIntBits(0.75f)).build()).build()).build())).thenReturn((Object)Storage.SplitReadStreamResponse.newBuilder().setPrimaryStream((Storage.Stream)streams.get(2)).setRemainderStream(Storage.Stream.newBuilder().setName("ignored")).build());
        Mockito.when((Object)fakeStorageClient.readRows(Storage.ReadRowsRequest.newBuilder().setReadPosition(Storage.StreamPosition.newBuilder().setStream((Storage.Stream)streams.get(2)).setOffset(2L)).build())).thenReturn(new FakeBigQueryServices.FakeBigQueryServerStream(Lists.newArrayList((Object[])new Storage.ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("D", 4L, AVRO_SCHEMA)}))})));
        BigQueryStorageStreamSource source = BigQueryStorageStreamSource.create((Storage.ReadSession)Storage.ReadSession.newBuilder().setName("readSession").setAvroSchema(AvroProto.AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).build(), (Storage.Stream)((Storage.Stream)streams.get(0)), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        BoundedSource.BoundedReader reader = source.createReader(this.options);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)"A", (Object)((TableRow)reader.getCurrent()).get((Object)"name"));
        BoundedSource residualSource = reader.splitAtFraction((double)0.83f);
        Assert.assertNotNull((Object)residualSource);
        Assert.assertEquals((Object)"A", (Object)((TableRow)reader.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)"B", (Object)((TableRow)reader.getCurrent()).get((Object)"name"));
        residualSource = reader.splitAtFraction(0.75);
        Assert.assertNotNull((Object)residualSource);
        Assert.assertEquals((Object)"B", (Object)((TableRow)reader.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)"C", (Object)((TableRow)reader.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)"D", (Object)((TableRow)reader.getCurrent()).get((Object)"name"));
        Assert.assertFalse((boolean)reader.advance());
    }

    @Test
    public void testStreamSourceSplitAtFractionFailsWhenSplitIsNotPossible() throws Exception {
        Storage.Stream parentStream = Storage.Stream.newBuilder().setName("parent").build();
        ArrayList parentResponses = Lists.newArrayList((Object[])new Storage.ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("A", 1L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("B", 2L, AVRO_SCHEMA)})), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA)})), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("D", 4L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("E", 5L, AVRO_SCHEMA)}))});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(Storage.ReadRowsRequest.newBuilder().setReadPosition(Storage.StreamPosition.newBuilder().setStream(parentStream)).build())).thenReturn(new FakeBigQueryServices.FakeBigQueryServerStream(parentResponses));
        Mockito.when((Object)fakeStorageClient.splitReadStream(Storage.SplitReadStreamRequest.newBuilder().setOriginalStream(parentStream).setUnknownFields(UnknownFieldSet.newBuilder().addField(2, UnknownFieldSet.Field.newBuilder().addFixed32(Float.floatToIntBits(0.5f)).build()).build()).build())).thenReturn((Object)Storage.SplitReadStreamResponse.getDefaultInstance());
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((Storage.ReadSession)Storage.ReadSession.newBuilder().setName("readSession").setAvroSchema(AvroProto.AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).build(), (Storage.Stream)parentStream, (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        BigQueryStorageStreamSource.BigQueryStorageStreamReader parent = streamSource.createReader(this.options);
        Assert.assertTrue((boolean)parent.start());
        Assert.assertEquals((Object)"A", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"B", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertNull((Object)parent.splitAtFraction(0.5));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"C", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"D", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"E", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertFalse((boolean)parent.advance());
    }

    @Test
    public void testStreamSourceSplitAtFractionFailsWhenParentIsPastSplitPoint() throws Exception {
        Storage.Stream parentStream = Storage.Stream.newBuilder().setName("parent").build();
        ArrayList parentResponses = Lists.newArrayList((Object[])new Storage.ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("A", 1L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("B", 2L, AVRO_SCHEMA)})), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA)})), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("D", 4L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("E", 5L, AVRO_SCHEMA)}))});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(Storage.ReadRowsRequest.newBuilder().setReadPosition(Storage.StreamPosition.newBuilder().setStream(parentStream)).build())).thenReturn(new FakeBigQueryServices.FakeBigQueryServerStream(parentResponses));
        Mockito.when((Object)fakeStorageClient.splitReadStream(Storage.SplitReadStreamRequest.newBuilder().setOriginalStream(parentStream).setUnknownFields(UnknownFieldSet.newBuilder().addField(2, UnknownFieldSet.Field.newBuilder().addFixed32(Float.floatToIntBits(0.5f)).build()).build()).build())).thenReturn((Object)Storage.SplitReadStreamResponse.newBuilder().setPrimaryStream(Storage.Stream.newBuilder().setName("primary")).setRemainderStream(Storage.Stream.newBuilder().setName("residual")).build());
        Mockito.when((Object)fakeStorageClient.readRows(Storage.ReadRowsRequest.newBuilder().setReadPosition(Storage.StreamPosition.newBuilder().setStream(Storage.Stream.newBuilder().setName("primary")).setOffset(2L)).build())).thenThrow(new Throwable[]{new FailedPreconditionException("Given row offset is invalid for stream.", (Throwable)new StatusRuntimeException(Status.FAILED_PRECONDITION), (StatusCode)GrpcStatusCode.of((Status.Code)Status.Code.FAILED_PRECONDITION), false)});
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((Storage.ReadSession)Storage.ReadSession.newBuilder().setName("readSession").setAvroSchema(AvroProto.AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).build(), (Storage.Stream)parentStream, (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        BigQueryStorageStreamSource.BigQueryStorageStreamReader parent = streamSource.createReader(this.options);
        Assert.assertTrue((boolean)parent.start());
        Assert.assertEquals((Object)"A", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"B", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertNull((Object)parent.splitAtFraction(0.5));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"C", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"D", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"E", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertFalse((boolean)parent.advance());
    }

    @Test
    public void testReadFromBigQueryIO() throws Exception {
        this.fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
        TableReference tableRef = BigQueryHelpers.parseTableSpec((String)"foo.com:project:dataset.table");
        Table table = new Table().setTableReference(tableRef).setNumBytes(Long.valueOf(10L)).setSchema(new TableSchema());
        this.fakeDatasetService.createTable(table);
        Storage.CreateReadSessionRequest expectedCreateReadSessionRequest = Storage.CreateReadSessionRequest.newBuilder().setParent("projects/project-id").setTableReference(BigQueryHelpers.toTableRefProto((TableReference)tableRef)).setRequestedStreams(10).setUnknownFields(UnknownFieldSet.newBuilder().addField(7, UnknownFieldSet.Field.newBuilder().addVarint(2L).build()).build()).build();
        Storage.ReadSession readSession = Storage.ReadSession.newBuilder().setName("readSessionName").setAvroSchema(AvroProto.AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).addStreams(Storage.Stream.newBuilder().setName("streamName")).build();
        Storage.ReadRowsRequest expectedReadRowsRequest = Storage.ReadRowsRequest.newBuilder().setReadPosition(Storage.StreamPosition.newBuilder().setStream(Storage.Stream.newBuilder().setName("streamName"))).build();
        ArrayList records = Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("A", 1L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("B", 2L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("D", 4L, AVRO_SCHEMA)});
        ArrayList readRowsResponses = Lists.newArrayList((Object[])new Storage.ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(0, 2)), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(2, 4))});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class, (MockSettings)Mockito.withSettings().serializable());
        Mockito.when((Object)fakeStorageClient.createReadSession(expectedCreateReadSessionRequest)).thenReturn((Object)readSession);
        Mockito.when((Object)fakeStorageClient.readRows(expectedReadRowsRequest)).thenReturn(new FakeBigQueryServices.FakeBigQueryServerStream(readRowsResponses));
        PCollection output = (PCollection)this.p.apply((PTransform)BigQueryIO.read((SerializableFunction)new ParseKeyValue()).from("foo.com:project:dataset.table").withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).withTestServices((BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService).withStorageClient(fakeStorageClient)));
        PAssert.that((PCollection)output).containsInAnyOrder((Iterable)ImmutableList.of((Object)KV.of((Object)"A", (Object)1L), (Object)KV.of((Object)"B", (Object)2L), (Object)KV.of((Object)"C", (Object)3L), (Object)KV.of((Object)"D", (Object)4L)));
        this.p.run();
    }

    private static final class ParseKeyValue
    implements SerializableFunction<SchemaAndRecord, KV<String, Long>> {
        private ParseKeyValue() {
        }

        public KV<String, Long> apply(SchemaAndRecord input) {
            return KV.of((Object)input.getRecord().get("name").toString(), (Object)((Long)input.getRecord().get("number")));
        }
    }
}

