/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.FailedPreconditionException;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.services.bigquery.model.Streamingbuffer;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1.ArrowSchema;
import com.google.cloud.bigquery.storage.v1.AvroRows;
import com.google.cloud.bigquery.storage.v1.AvroSchema;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadStream;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
import com.google.cloud.bigquery.storage.v1.StreamStats;
import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigInteger;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.util.Text;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.protobuf.ByteStringCoder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageStreamSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageTableSource;
import org.apache.beam.sdk.io.gcp.bigquery.ProjectOverride;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.model.Statement;
import org.mockito.ArgumentMatchers;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

@RunWith(value=JUnit4.class)
public class BigQueryIOStorageReadTest {
    private transient PipelineOptions options;
    private final transient TemporaryFolder testFolder = new TemporaryFolder();
    private transient TestPipeline p;
    private BufferAllocator allocator;
    @Rule
    public final transient TestRule folderThenPipeline = new TestRule(){

        public Statement apply(final Statement base, final Description description) {
            Statement withPipeline = new Statement(){

                public void evaluate() throws Throwable {
                    BigQueryIOStorageReadTest.this.options = TestPipeline.testingPipelineOptions();
                    ((BigQueryOptions)BigQueryIOStorageReadTest.this.options.as(BigQueryOptions.class)).setProject("project-id");
                    if (description.getAnnotations().stream().anyMatch(a -> a.annotationType().equals(ProjectOverride.class))) {
                        ((BigQueryOptions)BigQueryIOStorageReadTest.this.options.as(BigQueryOptions.class)).setBigQueryProject("bigquery-project-id");
                    }
                    ((BigQueryOptions)BigQueryIOStorageReadTest.this.options.as(BigQueryOptions.class)).setTempLocation(BigQueryIOStorageReadTest.this.testFolder.getRoot().getAbsolutePath());
                    BigQueryIOStorageReadTest.this.p = TestPipeline.fromOptions((PipelineOptions)BigQueryIOStorageReadTest.this.options);
                    BigQueryIOStorageReadTest.this.p.apply(base, description).evaluate();
                }
            };
            return BigQueryIOStorageReadTest.this.testFolder.apply(withPipeline, description);
        }
    };
    @Rule
    public transient ExpectedException thrown = ExpectedException.none();
    private final FakeDatasetService fakeDatasetService = new FakeDatasetService();
    private static final String AVRO_SCHEMA_STRING = "{\"namespace\": \"example.avro\",\n \"type\": \"record\",\n \"name\": \"RowRecord\",\n \"fields\": [\n     {\"name\": \"name\", \"type\": \"string\"},\n     {\"name\": \"number\", \"type\": \"long\"}\n ]\n}";
    private static final Schema AVRO_SCHEMA = new Schema.Parser().parse("{\"namespace\": \"example.avro\",\n \"type\": \"record\",\n \"name\": \"RowRecord\",\n \"fields\": [\n     {\"name\": \"name\", \"type\": \"string\"},\n     {\"name\": \"number\", \"type\": \"long\"}\n ]\n}");
    private static final String TRIMMED_AVRO_SCHEMA_STRING = "{\"namespace\": \"example.avro\",\n\"type\": \"record\",\n\"name\": \"RowRecord\",\n\"fields\": [\n    {\"name\": \"name\", \"type\": \"string\"}\n ]\n}";
    private static final Schema TRIMMED_AVRO_SCHEMA = new Schema.Parser().parse("{\"namespace\": \"example.avro\",\n\"type\": \"record\",\n\"name\": \"RowRecord\",\n\"fields\": [\n    {\"name\": \"name\", \"type\": \"string\"}\n ]\n}");
    private static final TableSchema TABLE_SCHEMA = new TableSchema().setFields((List)ImmutableList.of((Object)new TableFieldSchema().setName("name").setType("STRING").setMode("REQUIRED"), (Object)new TableFieldSchema().setName("number").setType("INTEGER").setMode("REQUIRED")));
    private static final org.apache.arrow.vector.types.pojo.Schema ARROW_SCHEMA = new org.apache.arrow.vector.types.pojo.Schema(Arrays.asList(BigQueryIOStorageReadTest.field("name", (ArrowType)new ArrowType.Utf8(), new Field[0]), BigQueryIOStorageReadTest.field("number", (ArrowType)new ArrowType.Int(64, true), new Field[0])));
    private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get();
    private static final double DELTA = 1.0E-6;

    @Before
    public void setUp() throws Exception {
        FakeDatasetService.setUp();
        this.allocator = new RootAllocator(Long.MAX_VALUE);
    }

    @After
    public void teardown() {
        this.allocator.close();
    }

    @Test
    public void testBuildTableBasedSource() {
        BigQueryIO.TypedRead typedRead = BigQueryIO.read((SerializableFunction)new BigQueryIO.TableRowParser()).withCoder((Coder)TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table");
        this.checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table");
        Assert.assertTrue((boolean)typedRead.getValidate());
    }

    @Test
    public void testBuildTableBasedSourceWithoutValidation() {
        BigQueryIO.TypedRead typedRead = BigQueryIO.read((SerializableFunction)new BigQueryIO.TableRowParser()).withCoder((Coder)TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table").withoutValidation();
        this.checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table");
        Assert.assertFalse((boolean)typedRead.getValidate());
    }

    @Test
    public void testBuildTableBasedSourceWithDefaultProject() {
        BigQueryIO.TypedRead typedRead = BigQueryIO.read((SerializableFunction)new BigQueryIO.TableRowParser()).withCoder((Coder)TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("myDataset.myTable");
        this.checkTypedReadTableObject(typedRead, null, "myDataset", "myTable");
    }

    @Test
    public void testBuildTableBasedSourceWithTableReference() {
        TableReference tableReference = new TableReference().setProjectId("foo.com:project").setDatasetId("dataset").setTableId("table");
        BigQueryIO.TypedRead typedRead = BigQueryIO.read((SerializableFunction)new BigQueryIO.TableRowParser()).withCoder((Coder)TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from(tableReference);
        this.checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table");
    }

    private void checkTypedReadTableObject(BigQueryIO.TypedRead<?> typedRead, String project, String dataset, String table) {
        Assert.assertEquals((Object)project, (Object)typedRead.getTable().getProjectId());
        Assert.assertEquals((Object)dataset, (Object)typedRead.getTable().getDatasetId());
        Assert.assertEquals((Object)table, (Object)typedRead.getTable().getTableId());
        Assert.assertNull((Object)typedRead.getQuery());
        Assert.assertEquals((Object)BigQueryIO.TypedRead.Method.DIRECT_READ, (Object)typedRead.getMethod());
    }

    @Test
    public void testBuildSourceWithTableAndFlatten() {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Invalid BigQueryIO.Read: Specifies a table with a result flattening preference, which only applies to queries");
        this.p.apply("ReadMyTable", (PTransform)BigQueryIO.read((SerializableFunction)new BigQueryIO.TableRowParser()).withCoder((Coder)TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table").withoutResultFlattening());
        this.p.run();
    }

    @Test
    public void testBuildSourceWithTableAndSqlDialect() {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Invalid BigQueryIO.Read: Specifies a table with a SQL dialect preference, which only applies to queries");
        this.p.apply("ReadMyTable", (PTransform)BigQueryIO.read((SerializableFunction)new BigQueryIO.TableRowParser()).withCoder((Coder)TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table").usingStandardSql());
        this.p.run();
    }

    @Test
    public void testDisplayData() {
        String tableSpec = "foo.com:project:dataset.table";
        BigQueryIO.TypedRead typedRead = BigQueryIO.read((SerializableFunction)new BigQueryIO.TableRowParser()).withCoder((Coder)TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from(tableSpec);
        DisplayData displayData = DisplayData.from((HasDisplayData)typedRead);
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"table", (String)tableSpec));
    }

    @Test
    public void testName() {
        Assert.assertEquals((Object)"BigQueryIO.TypedRead", (Object)BigQueryIO.read((SerializableFunction)new BigQueryIO.TableRowParser()).withCoder((Coder)TableRowJsonCoder.of()).withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).from("foo.com:project:dataset.table").getName());
    }

    @Test
    public void testCoderInference() {
        SerializableFunction<SchemaAndRecord, KV<ByteString, ReadSession>> parseFn = new SerializableFunction<SchemaAndRecord, KV<ByteString, ReadSession>>(){

            public KV<ByteString, ReadSession> apply(SchemaAndRecord input) {
                return null;
            }
        };
        Assert.assertEquals((Object)KvCoder.of((Coder)ByteStringCoder.of(), (Coder)ProtoCoder.of(ReadSession.class)), (Object)BigQueryIO.read((SerializableFunction)parseFn).inferCoder(CoderRegistry.createDefault()));
    }

    @Test
    public void testTableSourceEstimatedSize() throws Exception {
        this.doTableSourceEstimatedSizeTest(false);
    }

    @Test
    public void testTableSourceEstimatedSize_IgnoresStreamingBuffer() throws Exception {
        this.doTableSourceEstimatedSizeTest(true);
    }

    private void doTableSourceEstimatedSizeTest(boolean useStreamingBuffer) throws Exception {
        this.fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
        TableReference tableRef = BigQueryHelpers.parseTableSpec((String)"foo.com:project:dataset.table");
        Table table = new Table().setTableReference(tableRef).setNumBytes(Long.valueOf(100L));
        if (useStreamingBuffer) {
            table.setStreamingBuffer(new Streamingbuffer().setEstimatedBytes(BigInteger.TEN));
        }
        this.fakeDatasetService.createTable(table);
        BigQueryStorageTableSource tableSource = BigQueryStorageTableSource.create((ValueProvider)ValueProvider.StaticValueProvider.of((Object)tableRef), null, null, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService));
        Assert.assertEquals((long)100L, (long)tableSource.getEstimatedSizeBytes(this.options));
    }

    @Test
    @ProjectOverride
    public void testTableSourceEstimatedSize_WithBigQueryProject() throws Exception {
        this.fakeDatasetService.createDataset("bigquery-project-id", "dataset", "", "", null);
        TableReference tableRef = BigQueryHelpers.parseTableSpec((String)"bigquery-project-id:dataset.table");
        Table table = new Table().setTableReference(tableRef).setNumBytes(Long.valueOf(100L));
        this.fakeDatasetService.createTable(table);
        BigQueryStorageTableSource tableSource = BigQueryStorageTableSource.create((ValueProvider)ValueProvider.StaticValueProvider.of((Object)BigQueryHelpers.parseTableSpec((String)"dataset.table")), null, null, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService));
        Assert.assertEquals((long)100L, (long)tableSource.getEstimatedSizeBytes(this.options));
    }

    @Test
    public void testTableSourceEstimatedSize_WithDefaultProject() throws Exception {
        this.fakeDatasetService.createDataset("project-id", "dataset", "", "", null);
        TableReference tableRef = BigQueryHelpers.parseTableSpec((String)"project-id:dataset.table");
        Table table = new Table().setTableReference(tableRef).setNumBytes(Long.valueOf(100L));
        this.fakeDatasetService.createTable(table);
        BigQueryStorageTableSource tableSource = BigQueryStorageTableSource.create((ValueProvider)ValueProvider.StaticValueProvider.of((Object)BigQueryHelpers.parseTableSpec((String)"dataset.table")), null, null, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService));
        Assert.assertEquals((long)100L, (long)tableSource.getEstimatedSizeBytes(this.options));
    }

    @Test
    public void testTableSourceInitialSplit() throws Exception {
        this.doTableSourceInitialSplitTest(1024L, 1024);
    }

    @Test
    public void testTableSourceInitialSplit_MinSplitCount() throws Exception {
        this.doTableSourceInitialSplitTest(0x100000L, 10);
    }

    @Test
    public void testTableSourceInitialSplit_MaxSplitCount() throws Exception {
        this.doTableSourceInitialSplitTest(10L, 10000);
    }

    private void doTableSourceInitialSplitTest(long bundleSize, int streamCount) throws Exception {
        this.fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
        TableReference tableRef = BigQueryHelpers.parseTableSpec((String)"foo.com:project:dataset.table");
        Table table = new Table().setTableReference(tableRef).setNumBytes(Long.valueOf(0x100000L)).setSchema(TABLE_SCHEMA);
        this.fakeDatasetService.createTable(table);
        CreateReadSessionRequest expectedRequest = CreateReadSessionRequest.newBuilder().setParent("projects/project-id").setReadSession(ReadSession.newBuilder().setTable("projects/foo.com:project/datasets/dataset/tables/table")).setMaxStreamCount(streamCount).build();
        ReadSession.Builder builder = ReadSession.newBuilder().setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).setDataFormat(DataFormat.AVRO);
        for (int i = 0; i < streamCount; ++i) {
            builder.addStreams(ReadStream.newBuilder().setName("stream-" + i));
        }
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.createReadSession(expectedRequest)).thenReturn((Object)builder.build());
        BigQueryStorageTableSource tableSource = BigQueryStorageTableSource.create((ValueProvider)ValueProvider.StaticValueProvider.of((Object)tableRef), null, null, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService).withStorageClient(fakeStorageClient));
        List sources = tableSource.split(bundleSize, this.options);
        Assert.assertEquals((long)streamCount, (long)sources.size());
    }

    @Test
    public void testTableSourceInitialSplit_WithSelectedFieldsAndRowRestriction() throws Exception {
        this.fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
        TableReference tableRef = BigQueryHelpers.parseTableSpec((String)"foo.com:project:dataset.table");
        Table table = new Table().setTableReference(tableRef).setNumBytes(Long.valueOf(100L)).setSchema(TABLE_SCHEMA);
        this.fakeDatasetService.createTable(table);
        CreateReadSessionRequest expectedRequest = CreateReadSessionRequest.newBuilder().setParent("projects/project-id").setReadSession(ReadSession.newBuilder().setTable("projects/foo.com:project/datasets/dataset/tables/table").setReadOptions(ReadSession.TableReadOptions.newBuilder().addSelectedFields("name").setRowRestriction("number > 5"))).setMaxStreamCount(10).build();
        ReadSession.Builder builder = ReadSession.newBuilder().setAvroSchema(AvroSchema.newBuilder().setSchema(TRIMMED_AVRO_SCHEMA_STRING)).setDataFormat(DataFormat.AVRO);
        for (int i = 0; i < 10; ++i) {
            builder.addStreams(ReadStream.newBuilder().setName("stream-" + i));
        }
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.createReadSession(expectedRequest)).thenReturn((Object)builder.build());
        BigQueryStorageTableSource tableSource = BigQueryStorageTableSource.create((ValueProvider)ValueProvider.StaticValueProvider.of((Object)tableRef), (ValueProvider)ValueProvider.StaticValueProvider.of((Object)Lists.newArrayList((Object[])new String[]{"name"})), (ValueProvider)ValueProvider.StaticValueProvider.of((Object)"number > 5"), (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService).withStorageClient(fakeStorageClient));
        List sources = tableSource.split(10L, this.options);
        Assert.assertEquals((long)10L, (long)sources.size());
    }

    @Test
    public void testTableSourceInitialSplit_WithDefaultProject() throws Exception {
        this.fakeDatasetService.createDataset("project-id", "dataset", "", "", null);
        TableReference tableRef = BigQueryHelpers.parseTableSpec((String)"project-id:dataset.table");
        Table table = new Table().setTableReference(tableRef).setNumBytes(Long.valueOf(0x100000L)).setSchema(TABLE_SCHEMA);
        this.fakeDatasetService.createTable(table);
        CreateReadSessionRequest expectedRequest = CreateReadSessionRequest.newBuilder().setParent("projects/project-id").setReadSession(ReadSession.newBuilder().setTable("projects/project-id/datasets/dataset/tables/table")).setMaxStreamCount(1024).build();
        ReadSession.Builder builder = ReadSession.newBuilder().setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).setDataFormat(DataFormat.AVRO);
        for (int i = 0; i < 50; ++i) {
            builder.addStreams(ReadStream.newBuilder().setName("stream-" + i));
        }
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.createReadSession(expectedRequest)).thenReturn((Object)builder.build());
        BigQueryStorageTableSource tableSource = BigQueryStorageTableSource.create((ValueProvider)ValueProvider.StaticValueProvider.of((Object)BigQueryHelpers.parseTableSpec((String)"dataset.table")), null, null, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService).withStorageClient(fakeStorageClient));
        List sources = tableSource.split(1024L, this.options);
        Assert.assertEquals((long)50L, (long)sources.size());
    }

    @Test
    public void testTableSourceInitialSplit_EmptyTable() throws Exception {
        this.fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
        TableReference tableRef = BigQueryHelpers.parseTableSpec((String)"foo.com:project:dataset.table");
        Table table = new Table().setTableReference(tableRef).setNumBytes(Long.valueOf(0x100000L)).setSchema(new TableSchema());
        this.fakeDatasetService.createTable(table);
        CreateReadSessionRequest expectedRequest = CreateReadSessionRequest.newBuilder().setParent("projects/project-id").setReadSession(ReadSession.newBuilder().setTable("projects/foo.com:project/datasets/dataset/tables/table")).setMaxStreamCount(1024).build();
        ReadSession emptyReadSession = ReadSession.newBuilder().build();
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.createReadSession(expectedRequest)).thenReturn((Object)emptyReadSession);
        BigQueryStorageTableSource tableSource = BigQueryStorageTableSource.create((ValueProvider)ValueProvider.StaticValueProvider.of((Object)tableRef), null, null, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService).withStorageClient(fakeStorageClient));
        List sources = tableSource.split(1024L, this.options);
        Assert.assertTrue((boolean)sources.isEmpty());
    }

    @Test
    public void testTableSourceCreateReader() throws Exception {
        BigQueryStorageTableSource tableSource = BigQueryStorageTableSource.create((ValueProvider)ValueProvider.StaticValueProvider.of((Object)BigQueryHelpers.parseTableSpec((String)"foo.com:project:dataset.table")), null, null, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService));
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage("BigQuery storage source must be split before reading");
        tableSource.createReader(this.options);
    }

    private static GenericRecord createRecord(String name, Schema schema) {
        GenericData.Record genericRecord = new GenericData.Record(schema);
        genericRecord.put("name", (Object)name);
        return genericRecord;
    }

    private static GenericRecord createRecord(String name, long number, Schema schema) {
        GenericData.Record genericRecord = new GenericData.Record(schema);
        genericRecord.put("name", (Object)name);
        genericRecord.put("number", (Object)number);
        return genericRecord;
    }

    private static ByteString serializeArrowSchema(org.apache.arrow.vector.types.pojo.Schema arrowSchema) {
        ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
        try {
            MessageSerializer.serialize((WriteChannel)new WriteChannel(Channels.newChannel(byteOutputStream)), (org.apache.arrow.vector.types.pojo.Schema)arrowSchema);
        }
        catch (IOException ex) {
            throw new RuntimeException("Failed to serialize arrow schema.", ex);
        }
        return ByteString.copyFrom((byte[])byteOutputStream.toByteArray());
    }

    private static ReadRowsResponse createResponse(Schema schema, Collection<GenericRecord> genericRecords, double progressAtResponseStart, double progressAtResponseEnd) throws Exception {
        GenericDatumWriter writer = new GenericDatumWriter(schema);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder = ENCODER_FACTORY.binaryEncoder((OutputStream)outputStream, null);
        for (GenericRecord genericRecord : genericRecords) {
            writer.write((Object)genericRecord, (Encoder)binaryEncoder);
        }
        binaryEncoder.flush();
        return ReadRowsResponse.newBuilder().setAvroRows(AvroRows.newBuilder().setSerializedBinaryRows(ByteString.copyFrom((byte[])outputStream.toByteArray())).setRowCount((long)genericRecords.size())).setRowCount((long)genericRecords.size()).setStats(StreamStats.newBuilder().setProgress(StreamStats.Progress.newBuilder().setAtResponseStart(progressAtResponseStart).setAtResponseEnd(progressAtResponseEnd))).build();
    }

    private ReadRowsResponse createResponseArrow(org.apache.arrow.vector.types.pojo.Schema arrowSchema, List<String> name, List<Long> number, double progressAtResponseStart, double progressAtResponseEnd) {
        com.google.cloud.bigquery.storage.v1.ArrowRecordBatch serializedRecord;
        try (VectorSchemaRoot schemaRoot = VectorSchemaRoot.create((org.apache.arrow.vector.types.pojo.Schema)arrowSchema, (BufferAllocator)this.allocator);){
            schemaRoot.allocateNew();
            schemaRoot.setRowCount(name.size());
            VarCharVector strVector = (VarCharVector)schemaRoot.getFieldVectors().get(0);
            BigIntVector bigIntVector = (BigIntVector)schemaRoot.getFieldVectors().get(1);
            for (int i = 0; i < name.size(); ++i) {
                bigIntVector.set(i, number.get(i).longValue());
                strVector.set(i, new Text(name.get(i)));
            }
            VectorUnloader unLoader = new VectorUnloader(schemaRoot);
            try (ArrowRecordBatch records = unLoader.getRecordBatch();){
                try (ByteArrayOutputStream os = new ByteArrayOutputStream();){
                    MessageSerializer.serialize((WriteChannel)new WriteChannel(Channels.newChannel(os)), (ArrowRecordBatch)records);
                    serializedRecord = com.google.cloud.bigquery.storage.v1.ArrowRecordBatch.newBuilder().setRowCount((long)records.getLength()).setSerializedRecordBatch(ByteString.copyFrom((byte[])os.toByteArray())).build();
                }
                catch (IOException e) {
                    throw new RuntimeException("Error writing to byte array output stream", e);
                }
            }
        }
        return ReadRowsResponse.newBuilder().setArrowRecordBatch(serializedRecord).setRowCount((long)name.size()).setStats(StreamStats.newBuilder().setProgress(StreamStats.Progress.newBuilder().setAtResponseStart(progressAtResponseStart).setAtResponseEnd(progressAtResponseEnd))).build();
    }

    @Test
    public void testStreamSourceEstimatedSizeBytes() throws Exception {
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((ReadSession)ReadSession.getDefaultInstance(), (ReadStream)ReadStream.getDefaultInstance(), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices());
        Assert.assertEquals((long)0L, (long)streamSource.getEstimatedSizeBytes(this.options));
    }

    @Test
    public void testStreamSourceSplit() throws Exception {
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((ReadSession)ReadSession.getDefaultInstance(), (ReadStream)ReadStream.getDefaultInstance(), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices());
        MatcherAssert.assertThat((Object)streamSource.split(0L, this.options), (Matcher)Matchers.containsInAnyOrder((Object[])new BoundedSource[]{streamSource}));
    }

    @Test
    public void testReadFromStreamSource() throws Exception {
        ReadSession readSession = ReadSession.newBuilder().setName("readSession").setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).build();
        ReadRowsRequest expectedRequest = ReadRowsRequest.newBuilder().setReadStream("readStream").build();
        ArrayList records = Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("A", 1L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("B", 2L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA)});
        ArrayList responses = Lists.newArrayList((Object[])new ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.5), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(2, 3), 0.5, 0.75)});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(expectedRequest, "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)responses));
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((ReadSession)readSession, (ReadStream)ReadStream.newBuilder().setName("readStream").build(), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        ArrayList<TableRow> rows = new ArrayList<TableRow>();
        BigQueryStorageStreamSource.BigQueryStorageStreamReader reader = streamSource.createReader(this.options);
        boolean hasNext = reader.start();
        while (hasNext) {
            rows.add((TableRow)reader.getCurrent());
            hasNext = reader.advance();
        }
        System.out.println("Rows: " + rows);
        Assert.assertEquals((long)3L, (long)rows.size());
    }

    @Test
    public void testFractionConsumed() throws Exception {
        ReadSession readSession = ReadSession.newBuilder().setName("readSession").setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).build();
        ReadRowsRequest expectedRequest = ReadRowsRequest.newBuilder().setReadStream("readStream").build();
        ArrayList records = Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("A", 1L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("B", 2L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("D", 4L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("E", 5L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("F", 6L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("G", 7L, AVRO_SCHEMA)});
        ArrayList responses = Lists.newArrayList((Object[])new ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.25), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList(), 0.25, 0.25), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(2, 4), 0.3, 0.5), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(4, 7), 0.7, 1.0)});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(expectedRequest, "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)responses));
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((ReadSession)readSession, (ReadStream)ReadStream.newBuilder().setName("readStream").build(), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        BigQueryStorageStreamSource.BigQueryStorageStreamReader reader = streamSource.createReader(this.options);
        Assert.assertEquals((double)0.0, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((double)0.125, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((double)0.25, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((double)0.4, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((double)0.5, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((double)0.8, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((double)0.9, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((double)1.0, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertFalse((boolean)reader.advance());
        Assert.assertEquals((Object)1.0, (Object)reader.getFractionConsumed());
    }

    @Test
    public void testFractionConsumedWithSplit() throws Exception {
        ReadSession readSession = ReadSession.newBuilder().setName("readSession").setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).build();
        ReadRowsRequest expectedRequest = ReadRowsRequest.newBuilder().setReadStream("parentStream").build();
        ArrayList records = Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("A", 1L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("B", 2L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("D", 4L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("E", 5L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("F", 6L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("G", 7L, AVRO_SCHEMA)});
        ArrayList parentResponses = Lists.newArrayList((Object[])new ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.25), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(2, 4), 0.3, 0.5), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(4, 7), 0.8, 0.875)});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(expectedRequest, "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)parentResponses));
        Mockito.when((Object)fakeStorageClient.splitReadStream(SplitReadStreamRequest.newBuilder().setName("parentStream").setFraction(0.5).build())).thenReturn((Object)SplitReadStreamResponse.newBuilder().setPrimaryStream(ReadStream.newBuilder().setName("primaryStream")).setRemainderStream(ReadStream.newBuilder().setName("remainderStream")).build());
        ArrayList primaryResponses = Lists.newArrayList((Object[])new ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(1, 3), 0.25, 0.75), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(3, 4), 0.8, 1.0)});
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(1L).build(), "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)primaryResponses));
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((ReadSession)readSession, (ReadStream)ReadStream.newBuilder().setName("parentStream").build(), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        BigQueryStorageStreamSource.BigQueryStorageStreamReader reader = streamSource.createReader(this.options);
        Assert.assertEquals((double)0.0, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((double)0.125, (double)reader.getFractionConsumed(), (double)1.0E-6);
        reader.splitAtFraction(0.5);
        Assert.assertEquals((double)0.125, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((double)0.5, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((double)0.75, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((double)1.0, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertFalse((boolean)reader.advance());
        Assert.assertEquals((double)1.0, (double)reader.getFractionConsumed(), (double)1.0E-6);
    }

    @Test
    public void testStreamSourceSplitAtFractionSucceeds() throws Exception {
        ArrayList parentResponses = Lists.newArrayList((Object[])new ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("A", 1L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("B", 2L, AVRO_SCHEMA)}), 0.0, 0.25), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA)}), 0.25, 0.5), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("D", 4L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("E", 5L, AVRO_SCHEMA)}), 0.5, 0.75)});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)parentResponses));
        Mockito.when((Object)fakeStorageClient.splitReadStream(SplitReadStreamRequest.newBuilder().setName("parentStream").setFraction(0.5).build())).thenReturn((Object)SplitReadStreamResponse.newBuilder().setPrimaryStream(ReadStream.newBuilder().setName("primaryStream")).setRemainderStream(ReadStream.newBuilder().setName("remainderStream")).build());
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(2L).build(), "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream(parentResponses.subList(1, 2)));
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream("remainderStream").build(), "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream(parentResponses.subList(2, parentResponses.size())));
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((ReadSession)ReadSession.newBuilder().setName("readSession").setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).build(), (ReadStream)ReadStream.newBuilder().setName("parentStream").build(), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        BigQueryStorageStreamSource.BigQueryStorageStreamReader parent = streamSource.createReader(this.options);
        Assert.assertTrue((boolean)parent.start());
        Assert.assertEquals((Object)"A", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"B", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        BigQueryStorageStreamSource.BigQueryStorageStreamReader primary = parent;
        BoundedSource residualSource = parent.splitAtFraction(0.5);
        Assert.assertNotNull((Object)residualSource);
        BoundedSource.BoundedReader residual = residualSource.createReader(this.options);
        Assert.assertTrue((boolean)primary.advance());
        Assert.assertEquals((Object)"C", (Object)((TableRow)primary.getCurrent()).get((Object)"name"));
        Assert.assertFalse((boolean)primary.advance());
        Assert.assertTrue((boolean)residual.start());
        Assert.assertEquals((Object)"D", (Object)((TableRow)residual.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)residual.advance());
        Assert.assertEquals((Object)"E", (Object)((TableRow)residual.getCurrent()).get((Object)"name"));
        Assert.assertFalse((boolean)residual.advance());
    }

    @Test
    public void testStreamSourceSplitAtFractionRepeated() throws Exception {
        ArrayList readStreams = Lists.newArrayList((Object[])new ReadStream[]{ReadStream.newBuilder().setName("stream1").build(), ReadStream.newBuilder().setName("stream2").build(), ReadStream.newBuilder().setName("stream3").build()});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream(((ReadStream)readStreams.get(0)).getName()).build(), "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)Lists.newArrayList((Object[])new ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("A", 1L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("B", 2L, AVRO_SCHEMA)}), 0.0, 0.25), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("D", 4L, AVRO_SCHEMA)}), 0.25, 0.5), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("E", 5L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("F", 6L, AVRO_SCHEMA)}), 0.5, 0.75)})));
        Mockito.when((Object)fakeStorageClient.splitReadStream(SplitReadStreamRequest.newBuilder().setName(((ReadStream)readStreams.get(0)).getName()).setFraction((double)0.83f).build())).thenReturn((Object)SplitReadStreamResponse.newBuilder().setPrimaryStream((ReadStream)readStreams.get(1)).setRemainderStream(ReadStream.newBuilder().setName("ignored")).build());
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream(((ReadStream)readStreams.get(1)).getName()).setOffset(1L).build(), "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)Lists.newArrayList((Object[])new ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("B", 2L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA)}), 0.0, 0.5), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("D", 4L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("E", 5L, AVRO_SCHEMA)}), 0.5, 0.75)})));
        Mockito.when((Object)fakeStorageClient.splitReadStream(SplitReadStreamRequest.newBuilder().setName(((ReadStream)readStreams.get(1)).getName()).setFraction(0.75).build())).thenReturn((Object)SplitReadStreamResponse.newBuilder().setPrimaryStream((ReadStream)readStreams.get(2)).setRemainderStream(ReadStream.newBuilder().setName("ignored")).build());
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream(((ReadStream)readStreams.get(2)).getName()).setOffset(2L).build(), "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)Lists.newArrayList((Object[])new ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("D", 4L, AVRO_SCHEMA)}), 0.8, 0.9)})));
        BigQueryStorageStreamSource source = BigQueryStorageStreamSource.create((ReadSession)ReadSession.newBuilder().setName("readSession").setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).build(), (ReadStream)((ReadStream)readStreams.get(0)), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        BoundedSource.BoundedReader reader = source.createReader(this.options);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)"A", (Object)((TableRow)reader.getCurrent()).get((Object)"name"));
        BoundedSource residualSource = reader.splitAtFraction((double)0.83f);
        Assert.assertNotNull((Object)residualSource);
        Assert.assertEquals((Object)"A", (Object)((TableRow)reader.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)"B", (Object)((TableRow)reader.getCurrent()).get((Object)"name"));
        residualSource = reader.splitAtFraction(0.75);
        Assert.assertNotNull((Object)residualSource);
        Assert.assertEquals((Object)"B", (Object)((TableRow)reader.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)"C", (Object)((TableRow)reader.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)"D", (Object)((TableRow)reader.getCurrent()).get((Object)"name"));
        Assert.assertFalse((boolean)reader.advance());
    }

    @Test
    public void testStreamSourceSplitAtFractionFailsWhenSplitIsNotPossible() throws Exception {
        ArrayList parentResponses = Lists.newArrayList((Object[])new ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("A", 1L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("B", 2L, AVRO_SCHEMA)}), 0.0, 0.25), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA)}), 0.25, 0.5), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("D", 4L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("E", 5L, AVRO_SCHEMA)}), 0.5, 0.75)});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)parentResponses));
        Mockito.when((Object)fakeStorageClient.splitReadStream(SplitReadStreamRequest.newBuilder().setName("parentStream").setFraction(0.5).build())).thenReturn((Object)SplitReadStreamResponse.getDefaultInstance());
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((ReadSession)ReadSession.newBuilder().setName("readSession").setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).build(), (ReadStream)ReadStream.newBuilder().setName("parentStream").build(), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        BigQueryStorageStreamSource.BigQueryStorageStreamReader parent = streamSource.createReader(this.options);
        Assert.assertTrue((boolean)parent.start());
        Assert.assertEquals((Object)"A", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"B", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertNull((Object)parent.splitAtFraction(0.5));
        ((BigQueryServices.StorageClient)Mockito.verify((Object)fakeStorageClient, (VerificationMode)Mockito.times((int)1))).splitReadStream((SplitReadStreamRequest)ArgumentMatchers.any());
        Assert.assertNull((Object)parent.splitAtFraction(0.5));
        ((BigQueryServices.StorageClient)Mockito.verify((Object)fakeStorageClient, (VerificationMode)Mockito.times((int)1))).splitReadStream((SplitReadStreamRequest)ArgumentMatchers.any());
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"C", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"D", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"E", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertFalse((boolean)parent.advance());
    }

    @Test
    public void testStreamSourceSplitAtFractionFailsWhenParentIsPastSplitPoint() throws Exception {
        ArrayList parentResponses = Lists.newArrayList((Object[])new ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("A", 1L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("B", 2L, AVRO_SCHEMA)}), 0.0, 0.25), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA)}), 0.25, 0.5), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("D", 4L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("E", 5L, AVRO_SCHEMA)}), 0.5, 0.75)});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)parentResponses));
        Mockito.when((Object)fakeStorageClient.splitReadStream(SplitReadStreamRequest.newBuilder().setName("parentStream").setFraction(0.5).build())).thenReturn((Object)SplitReadStreamResponse.newBuilder().setPrimaryStream(ReadStream.newBuilder().setName("primaryStream")).setRemainderStream(ReadStream.newBuilder().setName("remainderStream")).build());
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(2L).build(), "")).thenThrow(new Throwable[]{new FailedPreconditionException("Given row offset is invalid for stream.", (Throwable)new StatusRuntimeException(Status.FAILED_PRECONDITION), (StatusCode)GrpcStatusCode.of((Status.Code)Status.Code.FAILED_PRECONDITION), false)});
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((ReadSession)ReadSession.newBuilder().setName("readSession").setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).build(), (ReadStream)ReadStream.newBuilder().setName("parentStream").build(), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        BigQueryStorageStreamSource.BigQueryStorageStreamReader parent = streamSource.createReader(this.options);
        Assert.assertTrue((boolean)parent.start());
        Assert.assertEquals((Object)"A", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"B", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertNull((Object)parent.splitAtFraction(0.5));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"C", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"D", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"E", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertFalse((boolean)parent.advance());
    }

    @Test
    public void testReadFromBigQueryIO() throws Exception {
        this.fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
        TableReference tableRef = BigQueryHelpers.parseTableSpec((String)"foo.com:project:dataset.table");
        Table table = new Table().setTableReference(tableRef).setNumBytes(Long.valueOf(10L)).setSchema(TABLE_SCHEMA);
        this.fakeDatasetService.createTable(table);
        CreateReadSessionRequest expectedCreateReadSessionRequest = CreateReadSessionRequest.newBuilder().setParent("projects/project-id").setReadSession(ReadSession.newBuilder().setTable("projects/foo.com:project/datasets/dataset/tables/table").setDataFormat(DataFormat.AVRO)).setMaxStreamCount(10).build();
        ReadSession readSession = ReadSession.newBuilder().setName("readSessionName").setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).addStreams(ReadStream.newBuilder().setName("streamName")).setDataFormat(DataFormat.AVRO).build();
        ReadRowsRequest expectedReadRowsRequest = ReadRowsRequest.newBuilder().setReadStream("streamName").build();
        ArrayList records = Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("A", 1L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("B", 2L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("C", 3L, AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("D", 4L, AVRO_SCHEMA)});
        ArrayList readRowsResponses = Lists.newArrayList((Object[])new ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.5), BigQueryIOStorageReadTest.createResponse(AVRO_SCHEMA, records.subList(2, 4), 0.5, 0.75)});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class, (MockSettings)Mockito.withSettings().serializable());
        Mockito.when((Object)fakeStorageClient.createReadSession(expectedCreateReadSessionRequest)).thenReturn((Object)readSession);
        Mockito.when((Object)fakeStorageClient.readRows(expectedReadRowsRequest, "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)readRowsResponses));
        PCollection output = (PCollection)this.p.apply((PTransform)BigQueryIO.read((SerializableFunction)new ParseKeyValue()).from("foo.com:project:dataset.table").withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).withFormat(DataFormat.AVRO).withTestServices((BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService).withStorageClient(fakeStorageClient)));
        PAssert.that((PCollection)output).containsInAnyOrder((Iterable)ImmutableList.of((Object)KV.of((Object)"A", (Object)1L), (Object)KV.of((Object)"B", (Object)2L), (Object)KV.of((Object)"C", (Object)3L), (Object)KV.of((Object)"D", (Object)4L)));
        this.p.run();
    }

    @Test
    public void testReadFromBigQueryIOWithTrimmedSchema() throws Exception {
        this.fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
        TableReference tableRef = BigQueryHelpers.parseTableSpec((String)"foo.com:project:dataset.table");
        Table table = new Table().setTableReference(tableRef).setNumBytes(Long.valueOf(10L)).setSchema(TABLE_SCHEMA);
        this.fakeDatasetService.createTable(table);
        CreateReadSessionRequest expectedCreateReadSessionRequest = CreateReadSessionRequest.newBuilder().setParent("projects/project-id").setReadSession(ReadSession.newBuilder().setTable("projects/foo.com:project/datasets/dataset/tables/table").setReadOptions(ReadSession.TableReadOptions.newBuilder().addSelectedFields("name")).setDataFormat(DataFormat.AVRO)).setMaxStreamCount(10).build();
        ReadSession readSession = ReadSession.newBuilder().setName("readSessionName").setAvroSchema(AvroSchema.newBuilder().setSchema(TRIMMED_AVRO_SCHEMA_STRING)).addStreams(ReadStream.newBuilder().setName("streamName")).setDataFormat(DataFormat.AVRO).build();
        ReadRowsRequest expectedReadRowsRequest = ReadRowsRequest.newBuilder().setReadStream("streamName").build();
        ArrayList records = Lists.newArrayList((Object[])new GenericRecord[]{BigQueryIOStorageReadTest.createRecord("A", TRIMMED_AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("B", TRIMMED_AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("C", TRIMMED_AVRO_SCHEMA), BigQueryIOStorageReadTest.createRecord("D", TRIMMED_AVRO_SCHEMA)});
        ArrayList readRowsResponses = Lists.newArrayList((Object[])new ReadRowsResponse[]{BigQueryIOStorageReadTest.createResponse(TRIMMED_AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.5), BigQueryIOStorageReadTest.createResponse(TRIMMED_AVRO_SCHEMA, records.subList(2, 4), 0.5, 0.75)});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class, (MockSettings)Mockito.withSettings().serializable());
        Mockito.when((Object)fakeStorageClient.createReadSession(expectedCreateReadSessionRequest)).thenReturn((Object)readSession);
        Mockito.when((Object)fakeStorageClient.readRows(expectedReadRowsRequest, "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)readRowsResponses));
        PCollection output = (PCollection)this.p.apply((PTransform)BigQueryIO.readTableRows().from("foo.com:project:dataset.table").withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).withSelectedFields((List)Lists.newArrayList((Object[])new String[]{"name"})).withFormat(DataFormat.AVRO).withTestServices((BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService).withStorageClient(fakeStorageClient)));
        PAssert.that((PCollection)output).containsInAnyOrder((Iterable)ImmutableList.of((Object)new TableRow().set("name", (Object)"A"), (Object)new TableRow().set("name", (Object)"B"), (Object)new TableRow().set("name", (Object)"C"), (Object)new TableRow().set("name", (Object)"D")));
        this.p.run();
    }

    @Test
    public void testReadFromBigQueryIOArrow() throws Exception {
        this.fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
        TableReference tableRef = BigQueryHelpers.parseTableSpec((String)"foo.com:project:dataset.table");
        Table table = new Table().setTableReference(tableRef).setNumBytes(Long.valueOf(10L)).setSchema(TABLE_SCHEMA);
        this.fakeDatasetService.createTable(table);
        CreateReadSessionRequest expectedCreateReadSessionRequest = CreateReadSessionRequest.newBuilder().setParent("projects/project-id").setReadSession(ReadSession.newBuilder().setTable("projects/foo.com:project/datasets/dataset/tables/table").setDataFormat(DataFormat.ARROW)).setMaxStreamCount(10).build();
        ReadSession readSession = ReadSession.newBuilder().setName("readSessionName").setArrowSchema(ArrowSchema.newBuilder().setSerializedSchema(BigQueryIOStorageReadTest.serializeArrowSchema(ARROW_SCHEMA)).build()).addStreams(ReadStream.newBuilder().setName("streamName")).setDataFormat(DataFormat.ARROW).build();
        ReadRowsRequest expectedReadRowsRequest = ReadRowsRequest.newBuilder().setReadStream("streamName").build();
        List<String> names = Arrays.asList("A", "B", "C", "D");
        List<Long> values = Arrays.asList(1L, 2L, 3L, 4L);
        ArrayList readRowsResponses = Lists.newArrayList((Object[])new ReadRowsResponse[]{this.createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), values.subList(0, 2), 0.0, 0.5), this.createResponseArrow(ARROW_SCHEMA, names.subList(2, 4), values.subList(2, 4), 0.5, 0.75)});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class, (MockSettings)Mockito.withSettings().serializable());
        Mockito.when((Object)fakeStorageClient.createReadSession(expectedCreateReadSessionRequest)).thenReturn((Object)readSession);
        Mockito.when((Object)fakeStorageClient.readRows(expectedReadRowsRequest, "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)readRowsResponses));
        PCollection output = (PCollection)this.p.apply((PTransform)BigQueryIO.read((SerializableFunction)new ParseKeyValue()).from("foo.com:project:dataset.table").withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ).withFormat(DataFormat.ARROW).withTestServices((BigQueryServices)new FakeBigQueryServices().withDatasetService(this.fakeDatasetService).withStorageClient(fakeStorageClient)));
        PAssert.that((PCollection)output).containsInAnyOrder((Iterable)ImmutableList.of((Object)KV.of((Object)"A", (Object)1L), (Object)KV.of((Object)"B", (Object)2L), (Object)KV.of((Object)"C", (Object)3L), (Object)KV.of((Object)"D", (Object)4L)));
        this.p.run();
    }

    @Test
    public void testReadFromStreamSourceArrow() throws Exception {
        ReadSession readSession = ReadSession.newBuilder().setName("readSession").setArrowSchema(ArrowSchema.newBuilder().setSerializedSchema(BigQueryIOStorageReadTest.serializeArrowSchema(ARROW_SCHEMA)).build()).setDataFormat(DataFormat.ARROW).build();
        ReadRowsRequest expectedRequest = ReadRowsRequest.newBuilder().setReadStream("readStream").build();
        List<String> names = Arrays.asList("A", "B", "C");
        List<Long> values = Arrays.asList(1L, 2L, 3L);
        ArrayList responses = Lists.newArrayList((Object[])new ReadRowsResponse[]{this.createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), values.subList(0, 2), 0.0, 0.5), this.createResponseArrow(ARROW_SCHEMA, names.subList(2, 3), values.subList(2, 3), 0.5, 0.75)});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(expectedRequest, "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)responses));
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((ReadSession)readSession, (ReadStream)ReadStream.newBuilder().setName("readStream").build(), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        ArrayList<TableRow> rows = new ArrayList<TableRow>();
        BigQueryStorageStreamSource.BigQueryStorageStreamReader reader = streamSource.createReader(this.options);
        boolean hasNext = reader.start();
        while (hasNext) {
            rows.add((TableRow)reader.getCurrent());
            hasNext = reader.advance();
        }
        System.out.println("Rows: " + rows);
        Assert.assertEquals((long)3L, (long)rows.size());
    }

    @Test
    public void testFractionConsumedArrow() throws Exception {
        ReadSession readSession = ReadSession.newBuilder().setName("readSession").setArrowSchema(ArrowSchema.newBuilder().setSerializedSchema(BigQueryIOStorageReadTest.serializeArrowSchema(ARROW_SCHEMA)).build()).setDataFormat(DataFormat.ARROW).build();
        ReadRowsRequest expectedRequest = ReadRowsRequest.newBuilder().setReadStream("readStream").build();
        List<String> names = Arrays.asList("A", "B", "C", "D", "E", "F", "G");
        List<Long> values = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L);
        ArrayList responses = Lists.newArrayList((Object[])new ReadRowsResponse[]{this.createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), values.subList(0, 2), 0.0, 0.25), this.createResponseArrow(ARROW_SCHEMA, Lists.newArrayList(), Lists.newArrayList(), 0.25, 0.25), this.createResponseArrow(ARROW_SCHEMA, names.subList(2, 4), values.subList(2, 4), 0.3, 0.5), this.createResponseArrow(ARROW_SCHEMA, names.subList(4, 7), values.subList(4, 7), 0.7, 1.0)});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(expectedRequest, "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)responses));
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((ReadSession)readSession, (ReadStream)ReadStream.newBuilder().setName("readStream").build(), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        BigQueryStorageStreamSource.BigQueryStorageStreamReader reader = streamSource.createReader(this.options);
        Assert.assertEquals((double)0.0, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((double)0.125, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((double)0.25, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((double)0.4, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((double)0.5, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((double)0.8, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((double)0.9, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((double)1.0, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertFalse((boolean)reader.advance());
        Assert.assertEquals((Object)1.0, (Object)reader.getFractionConsumed());
    }

    @Test
    public void testFractionConsumedWithSplitArrow() throws Exception {
        ReadSession readSession = ReadSession.newBuilder().setName("readSession").setArrowSchema(ArrowSchema.newBuilder().setSerializedSchema(BigQueryIOStorageReadTest.serializeArrowSchema(ARROW_SCHEMA)).build()).setDataFormat(DataFormat.ARROW).build();
        ReadRowsRequest expectedRequest = ReadRowsRequest.newBuilder().setReadStream("parentStream").build();
        List<String> names = Arrays.asList("A", "B", "C", "D", "E", "F", "G");
        List<Long> values = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L);
        ArrayList parentResponse = Lists.newArrayList((Object[])new ReadRowsResponse[]{this.createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), values.subList(0, 2), 0.0, 0.25), this.createResponseArrow(ARROW_SCHEMA, names.subList(2, 4), values.subList(2, 4), 0.3, 0.5), this.createResponseArrow(ARROW_SCHEMA, names.subList(4, 7), values.subList(4, 7), 0.7, 0.875)});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(expectedRequest, "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)parentResponse));
        Mockito.when((Object)fakeStorageClient.splitReadStream(SplitReadStreamRequest.newBuilder().setName("parentStream").setFraction(0.5).build())).thenReturn((Object)SplitReadStreamResponse.newBuilder().setPrimaryStream(ReadStream.newBuilder().setName("primaryStream")).setRemainderStream(ReadStream.newBuilder().setName("remainderStream")).build());
        ArrayList primaryResponses = Lists.newArrayList((Object[])new ReadRowsResponse[]{this.createResponseArrow(ARROW_SCHEMA, names.subList(1, 3), values.subList(1, 3), 0.25, 0.75), this.createResponseArrow(ARROW_SCHEMA, names.subList(3, 4), values.subList(3, 4), 0.8, 1.0)});
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(1L).build(), "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)primaryResponses));
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((ReadSession)readSession, (ReadStream)ReadStream.newBuilder().setName("parentStream").build(), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        BigQueryStorageStreamSource.BigQueryStorageStreamReader reader = streamSource.createReader(this.options);
        Assert.assertEquals((double)0.0, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((double)0.125, (double)reader.getFractionConsumed(), (double)1.0E-6);
        reader.splitAtFraction(0.5);
        Assert.assertEquals((double)0.125, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((double)0.5, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((double)0.75, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((double)1.0, (double)reader.getFractionConsumed(), (double)1.0E-6);
        Assert.assertFalse((boolean)reader.advance());
        Assert.assertEquals((double)1.0, (double)reader.getFractionConsumed(), (double)1.0E-6);
    }

    @Test
    public void testStreamSourceSplitAtFractionSucceedsArrow() throws Exception {
        List<String> names = Arrays.asList("A", "B", "C", "D", "E", "F", "G");
        List<Long> values = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L);
        ArrayList parentResponses = Lists.newArrayList((Object[])new ReadRowsResponse[]{this.createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), values.subList(0, 2), 0.0, 0.25), this.createResponseArrow(ARROW_SCHEMA, names.subList(2, 3), values.subList(2, 3), 0.25, 0.5), this.createResponseArrow(ARROW_SCHEMA, names.subList(3, 5), values.subList(3, 5), 0.5, 0.75)});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)parentResponses));
        Mockito.when((Object)fakeStorageClient.splitReadStream(SplitReadStreamRequest.newBuilder().setName("parentStream").setFraction(0.5).build())).thenReturn((Object)SplitReadStreamResponse.newBuilder().setPrimaryStream(ReadStream.newBuilder().setName("primaryStream")).setRemainderStream(ReadStream.newBuilder().setName("remainderStream")).build());
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(2L).build(), "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream(parentResponses.subList(1, 2)));
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream("remainderStream").build(), "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream(parentResponses.subList(2, parentResponses.size())));
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((ReadSession)ReadSession.newBuilder().setName("readSession").setArrowSchema(ArrowSchema.newBuilder().setSerializedSchema(BigQueryIOStorageReadTest.serializeArrowSchema(ARROW_SCHEMA)).build()).setDataFormat(DataFormat.ARROW).build(), (ReadStream)ReadStream.newBuilder().setName("parentStream").build(), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        BigQueryStorageStreamSource.BigQueryStorageStreamReader parent = streamSource.createReader(this.options);
        Assert.assertTrue((boolean)parent.start());
        Assert.assertEquals((Object)"A", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"B", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        BigQueryStorageStreamSource.BigQueryStorageStreamReader primary = parent;
        BoundedSource residualSource = parent.splitAtFraction(0.5);
        Assert.assertNotNull((Object)residualSource);
        BoundedSource.BoundedReader residual = residualSource.createReader(this.options);
        Assert.assertTrue((boolean)primary.advance());
        Assert.assertEquals((Object)"C", (Object)((TableRow)primary.getCurrent()).get((Object)"name"));
        Assert.assertFalse((boolean)primary.advance());
        Assert.assertTrue((boolean)residual.start());
        Assert.assertEquals((Object)"D", (Object)((TableRow)residual.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)residual.advance());
        Assert.assertEquals((Object)"E", (Object)((TableRow)residual.getCurrent()).get((Object)"name"));
        Assert.assertFalse((boolean)residual.advance());
    }

    @Test
    public void testStreamSourceSplitAtFractionRepeatedArrow() throws Exception {
        ArrayList readStreams = Lists.newArrayList((Object[])new ReadStream[]{ReadStream.newBuilder().setName("stream1").build(), ReadStream.newBuilder().setName("stream2").build(), ReadStream.newBuilder().setName("stream3").build()});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        List<String> names = Arrays.asList("A", "B", "C", "D", "E", "F");
        List<Long> values = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L);
        ArrayList parentResponses = Lists.newArrayList((Object[])new ReadRowsResponse[]{this.createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), values.subList(0, 2), 0.0, 0.25), this.createResponseArrow(ARROW_SCHEMA, names.subList(2, 4), values.subList(2, 4), 0.25, 0.5), this.createResponseArrow(ARROW_SCHEMA, names.subList(4, 6), values.subList(4, 6), 0.5, 0.75)});
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream(((ReadStream)readStreams.get(0)).getName()).build(), "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)parentResponses));
        Mockito.when((Object)fakeStorageClient.splitReadStream(SplitReadStreamRequest.newBuilder().setName(((ReadStream)readStreams.get(0)).getName()).setFraction((double)0.83f).build())).thenReturn((Object)SplitReadStreamResponse.newBuilder().setPrimaryStream((ReadStream)readStreams.get(1)).setRemainderStream(ReadStream.newBuilder().setName("ignored")).build());
        ArrayList otherResponses = Lists.newArrayList((Object[])new ReadRowsResponse[]{this.createResponseArrow(ARROW_SCHEMA, names.subList(1, 3), values.subList(1, 3), 0.0, 0.5), this.createResponseArrow(ARROW_SCHEMA, names.subList(3, 4), values.subList(3, 4), 0.5, 0.75)});
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream(((ReadStream)readStreams.get(1)).getName()).setOffset(1L).build(), "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)otherResponses));
        Mockito.when((Object)fakeStorageClient.splitReadStream(SplitReadStreamRequest.newBuilder().setName(((ReadStream)readStreams.get(1)).getName()).setFraction(0.75).build())).thenReturn((Object)SplitReadStreamResponse.newBuilder().setPrimaryStream((ReadStream)readStreams.get(2)).setRemainderStream(ReadStream.newBuilder().setName("ignored")).build());
        ArrayList lastResponses = Lists.newArrayList((Object[])new ReadRowsResponse[]{this.createResponseArrow(ARROW_SCHEMA, names.subList(2, 4), values.subList(2, 4), 0.8, 0.9)});
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream(((ReadStream)readStreams.get(2)).getName()).setOffset(2L).build(), "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)lastResponses));
        BigQueryStorageStreamSource source = BigQueryStorageStreamSource.create((ReadSession)ReadSession.newBuilder().setName("readSession").setArrowSchema(ArrowSchema.newBuilder().setSerializedSchema(BigQueryIOStorageReadTest.serializeArrowSchema(ARROW_SCHEMA)).build()).setDataFormat(DataFormat.ARROW).build(), (ReadStream)((ReadStream)readStreams.get(0)), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        BoundedSource.BoundedReader reader = source.createReader(this.options);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)"A", (Object)((TableRow)reader.getCurrent()).get((Object)"name"));
        BoundedSource residualSource = reader.splitAtFraction((double)0.83f);
        Assert.assertNotNull((Object)residualSource);
        Assert.assertEquals((Object)"A", (Object)((TableRow)reader.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)"B", (Object)((TableRow)reader.getCurrent()).get((Object)"name"));
        residualSource = reader.splitAtFraction(0.75);
        Assert.assertNotNull((Object)residualSource);
        Assert.assertEquals((Object)"B", (Object)((TableRow)reader.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)"C", (Object)((TableRow)reader.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)"D", (Object)((TableRow)reader.getCurrent()).get((Object)"name"));
        Assert.assertFalse((boolean)reader.advance());
    }

    @Test
    public void testStreamSourceSplitAtFractionFailsWhenSplitIsNotPossibleArrow() throws Exception {
        List<String> names = Arrays.asList("A", "B", "C", "D", "E", "F");
        List<Long> values = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L);
        ArrayList parentResponses = Lists.newArrayList((Object[])new ReadRowsResponse[]{this.createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), values.subList(0, 2), 0.0, 0.25), this.createResponseArrow(ARROW_SCHEMA, names.subList(2, 3), values.subList(2, 3), 0.25, 0.5), this.createResponseArrow(ARROW_SCHEMA, names.subList(3, 5), values.subList(3, 5), 0.5, 0.75)});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)parentResponses));
        Mockito.when((Object)fakeStorageClient.splitReadStream(SplitReadStreamRequest.newBuilder().setName("parentStream").setFraction(0.5).build())).thenReturn((Object)SplitReadStreamResponse.getDefaultInstance());
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((ReadSession)ReadSession.newBuilder().setName("readSession").setArrowSchema(ArrowSchema.newBuilder().setSerializedSchema(BigQueryIOStorageReadTest.serializeArrowSchema(ARROW_SCHEMA)).build()).setDataFormat(DataFormat.ARROW).build(), (ReadStream)ReadStream.newBuilder().setName("parentStream").build(), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        BigQueryStorageStreamSource.BigQueryStorageStreamReader parent = streamSource.createReader(this.options);
        Assert.assertTrue((boolean)parent.start());
        Assert.assertEquals((Object)"A", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"B", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertNull((Object)parent.splitAtFraction(0.5));
        ((BigQueryServices.StorageClient)Mockito.verify((Object)fakeStorageClient, (VerificationMode)Mockito.times((int)1))).splitReadStream((SplitReadStreamRequest)ArgumentMatchers.any());
        Assert.assertNull((Object)parent.splitAtFraction(0.5));
        ((BigQueryServices.StorageClient)Mockito.verify((Object)fakeStorageClient, (VerificationMode)Mockito.times((int)1))).splitReadStream((SplitReadStreamRequest)ArgumentMatchers.any());
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"C", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"D", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"E", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertFalse((boolean)parent.advance());
    }

    @Test
    public void testStreamSourceSplitAtFractionFailsWhenParentIsPastSplitPointArrow() throws Exception {
        List<String> names = Arrays.asList("A", "B", "C", "D", "E", "F");
        List<Long> values = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L);
        ArrayList parentResponses = Lists.newArrayList((Object[])new ReadRowsResponse[]{this.createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), values.subList(0, 2), 0.0, 0.25), this.createResponseArrow(ARROW_SCHEMA, names.subList(2, 3), values.subList(2, 3), 0.25, 0.5), this.createResponseArrow(ARROW_SCHEMA, names.subList(3, 5), values.subList(3, 5), 0.5, 0.75)});
        BigQueryServices.StorageClient fakeStorageClient = (BigQueryServices.StorageClient)Mockito.mock(BigQueryServices.StorageClient.class);
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), "")).thenReturn((Object)new FakeBigQueryServices.FakeBigQueryServerStream((List)parentResponses));
        Mockito.when((Object)fakeStorageClient.splitReadStream(SplitReadStreamRequest.newBuilder().setName("parentStream").setFraction(0.5).build())).thenReturn((Object)SplitReadStreamResponse.newBuilder().setPrimaryStream(ReadStream.newBuilder().setName("primaryStream")).setRemainderStream(ReadStream.newBuilder().setName("remainderStream")).build());
        Mockito.when((Object)fakeStorageClient.readRows(ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(2L).build(), "")).thenThrow(new Throwable[]{new FailedPreconditionException("Given row offset is invalid for stream.", (Throwable)new StatusRuntimeException(Status.FAILED_PRECONDITION), (StatusCode)GrpcStatusCode.of((Status.Code)Status.Code.FAILED_PRECONDITION), false)});
        BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create((ReadSession)ReadSession.newBuilder().setName("readSession").setArrowSchema(ArrowSchema.newBuilder().setSerializedSchema(BigQueryIOStorageReadTest.serializeArrowSchema(ARROW_SCHEMA)).build()).setDataFormat(DataFormat.ARROW).build(), (ReadStream)ReadStream.newBuilder().setName("parentStream").build(), (TableSchema)TABLE_SCHEMA, (SerializableFunction)new BigQueryIO.TableRowParser(), (Coder)TableRowJsonCoder.of(), (BigQueryServices)new FakeBigQueryServices().withStorageClient(fakeStorageClient));
        BigQueryStorageStreamSource.BigQueryStorageStreamReader parent = streamSource.createReader(this.options);
        Assert.assertTrue((boolean)parent.start());
        Assert.assertEquals((Object)"A", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"B", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertNull((Object)parent.splitAtFraction(0.5));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"C", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"D", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertTrue((boolean)parent.advance());
        Assert.assertEquals((Object)"E", (Object)((TableRow)parent.getCurrent()).get((Object)"name"));
        Assert.assertFalse((boolean)parent.advance());
    }

    private static Field field(String name, boolean nullable, ArrowType type, Field ... children) {
        return new Field(name, new FieldType(nullable, type, null, null), Arrays.asList(children));
    }

    static Field field(String name, ArrowType type, Field ... children) {
        return BigQueryIOStorageReadTest.field(name, false, type, children);
    }

    private static final class ParseKeyValue
    implements SerializableFunction<SchemaAndRecord, KV<String, Long>> {
        private ParseKeyValue() {
        }

        public KV<String, Long> apply(SchemaAndRecord input) {
            return KV.of((Object)input.getRecord().get("name").toString(), (Object)((Long)input.getRecord().get("number")));
        }
    }
}

