package org.apache.beam.sdk.io.gcp.bigquery.providers;

import com.google.api.services.bigquery.model.Table;
import com.google.cloud.bigquery.storage.v1.AvroRows;
import com.google.cloud.bigquery.storage.v1.AvroSchema;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadStream;
import com.google.cloud.bigquery.storage.v1.StreamStats;
import com.google.protobuf.ByteString;
import java.io.ByteArrayOutputStream;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderTest.class */
public class BigQueryDirectReadSchemaTransformProviderTest {
    private static final String TABLE_SPEC = "my-project:dataset.table";
    private static PipelineOptions testOptions = TestPipeline.testingPipelineOptions();
    private static final String AVRO_SCHEMA_STRING = "{\"namespace\": \"example.avro\",\n \"type\": \"record\",\n \"name\": \"RowRecord\",\n \"fields\": [\n     {\"name\": \"str\", \"type\": \"string\"},\n     {\"name\": \"num\", \"type\": \"long\"},\n     {\"name\": \"dt\", \"type\": \"string\", \"logicalType\": \"datetime\"}\n ]\n}";
    private static final Schema AVRO_SCHEMA = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
    private static final org.apache.beam.sdk.schemas.Schema SCHEMA = org.apache.beam.sdk.schemas.Schema.of(new Schema.Field[]{Schema.Field.of("str", Schema.FieldType.STRING), Schema.Field.of("num", Schema.FieldType.INT64), Schema.Field.of("dt", Schema.FieldType.logicalType(SqlTypes.DATETIME))});
    private static final List<Row> ROWS = Arrays.asList(Row.withSchema(SCHEMA).withFieldValue("str", "a").withFieldValue("num", 1L).withFieldValue("dt", LocalDateTime.parse("2000-01-01T00:00:00")).build(), Row.withSchema(SCHEMA).withFieldValue("str", "b").withFieldValue("num", 2L).withFieldValue("dt", LocalDateTime.parse("2000-01-02T00:00:00")).build(), Row.withSchema(SCHEMA).withFieldValue("str", "c").withFieldValue("num", 3L).withFieldValue("dt", LocalDateTime.parse("2000-01-03T00:00:00")).build());
    private static final String TRIMMED_AVRO_SCHEMA_STRING = "{\"namespace\": \"example.avro\",\n \"type\": \"record\",\n \"name\": \"RowRecord\",\n \"fields\": [\n     {\"name\": \"str\", \"type\": \"string\"}\n ]\n}";
    private static final org.apache.avro.Schema TRIMMED_AVRO_SCHEMA = new Schema.Parser().parse(TRIMMED_AVRO_SCHEMA_STRING);
    private static final org.apache.beam.sdk.schemas.Schema TRIMMED_SCHEMA = org.apache.beam.sdk.schemas.Schema.of(new Schema.Field[]{Schema.Field.of("str", Schema.FieldType.STRING)});
    private static final List<Row> TRIMMED_ROWS = Arrays.asList(Row.withSchema(TRIMMED_SCHEMA).withFieldValue("str", "b").build(), Row.withSchema(TRIMMED_SCHEMA).withFieldValue("str", "c").build());
    private final FakeDatasetService fakeDatasetService = new FakeDatasetService();
    private final FakeJobService fakeJobService = new FakeJobService();
    private final FakeBigQueryServices fakeBigQueryServices = new FakeBigQueryServices().withJobService(this.fakeJobService).withDatasetService(this.fakeDatasetService);

    @Rule
    public final transient TestPipeline p = TestPipeline.fromOptions(testOptions);

    @Before
    public void setUp() throws Exception {
        FakeDatasetService.setUp();
        Table schema = new Table().setTableReference(BigQueryHelpers.parseTableSpec(TABLE_SPEC)).setNumBytes(10L).setSchema(BigQueryUtils.toTableSchema(SCHEMA));
        this.fakeDatasetService.createDataset("my-project", "dataset", "", "test_dataset", (Long) null);
        this.fakeDatasetService.createTable(schema);
        testOptions.as(BigQueryOptions.class).setProject("parent-project");
    }

    private static GenericRecord createRecord(String str, long j, String str2, org.apache.avro.Schema schema) {
        GenericData.Record record = new GenericData.Record(schema);
        record.put("str", str);
        record.put("num", Long.valueOf(j));
        record.put("dt", str2);
        return record;
    }

    private static GenericRecord createRecord(String str, org.apache.avro.Schema schema) {
        GenericData.Record record = new GenericData.Record(schema);
        record.put("str", str);
        return record;
    }

    private static ReadRowsResponse createResponse(org.apache.avro.Schema schema, Collection<GenericRecord> collection, double d, double d2) throws Exception {
        GenericDatumWriter genericDatumWriter = new GenericDatumWriter(schema);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, (BinaryEncoder) null);
        Iterator<GenericRecord> it = collection.iterator();
        while (it.hasNext()) {
            genericDatumWriter.write(it.next(), binaryEncoder);
        }
        binaryEncoder.flush();
        return ReadRowsResponse.newBuilder().setAvroRows(AvroRows.newBuilder().setSerializedBinaryRows(ByteString.copyFrom(byteArrayOutputStream.toByteArray())).setRowCount(collection.size())).setRowCount(collection.size()).setStats(StreamStats.newBuilder().setProgress(StreamStats.Progress.newBuilder().setAtResponseStart(d).setAtResponseEnd(d2))).build();
    }

    @Test
    public void testValidateConfig() {
        for (BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransformConfiguration bigQueryDirectReadSchemaTransformConfiguration : Arrays.asList(BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransformConfiguration.builder().setQuery("SELECT * FROM project:dataset.table").setTableSpec("project:dataset.table").build(), BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransformConfiguration.builder().setQuery("SELECT * FROM project:dataset.table").setRowRestriction("num > 10").build(), BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransformConfiguration.builder().setTableSpec("not a table spec").build())) {
            Assert.assertThrows(IllegalArgumentException.class, () -> {
                bigQueryDirectReadSchemaTransformConfiguration.validate();
            });
        }
    }

    @Test
    public void testDirectRead() throws Exception {
        CreateReadSessionRequest build = CreateReadSessionRequest.newBuilder().setParent("projects/parent-project").setReadSession(ReadSession.newBuilder().setTable("projects/my-project/datasets/dataset/tables/table").setDataFormat(DataFormat.AVRO)).setMaxStreamCount(10).build();
        ReadSession build2 = ReadSession.newBuilder().setName("readSessionName").setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)).addStreams(ReadStream.newBuilder().setName("streamName")).setDataFormat(DataFormat.AVRO).build();
        ReadRowsRequest build3 = ReadRowsRequest.newBuilder().setReadStream("streamName").build();
        List asList = Arrays.asList(createRecord("a", 1L, "2000-01-01T00:00:00", AVRO_SCHEMA), createRecord("b", 2L, "2000-01-02T00:00:00", AVRO_SCHEMA), createRecord("c", 3L, "2000-01-03T00:00:00", AVRO_SCHEMA));
        List asList2 = Arrays.asList(createResponse(AVRO_SCHEMA, asList.subList(0, 2), 0.0d, 0.5d), createResponse(AVRO_SCHEMA, asList.subList(2, 3), 0.5d, 0.75d));
        BigQueryServices.StorageClient storageClient = (BigQueryServices.StorageClient) Mockito.mock(BigQueryServices.StorageClient.class, Mockito.withSettings().serializable());
        Mockito.when(storageClient.createReadSession(build)).thenReturn(build2);
        Mockito.when(storageClient.readRows(build3, "")).thenReturn(new FakeBigQueryServices.FakeBigQueryServerStream(asList2));
        BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransformConfiguration build4 = BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransformConfiguration.builder().setTableSpec(TABLE_SPEC).build();
        BigQueryDirectReadSchemaTransformProvider bigQueryDirectReadSchemaTransformProvider = new BigQueryDirectReadSchemaTransformProvider();
        BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransform from = bigQueryDirectReadSchemaTransformProvider.from(build4);
        PCollectionRowTuple empty = PCollectionRowTuple.empty(this.p);
        String str = (String) bigQueryDirectReadSchemaTransformProvider.outputCollectionNames().get(0);
        from.setBigQueryServices(this.fakeBigQueryServices.withStorageClient(storageClient));
        PCollectionRowTuple apply = empty.apply(from);
        Assert.assertTrue(apply.has(str));
        PAssert.that(apply.get(str)).containsInAnyOrder(ROWS);
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testDirectReadWithSelectedFieldsAndRowRestriction() throws Exception {
        CreateReadSessionRequest build = CreateReadSessionRequest.newBuilder().setParent("projects/parent-project").setReadSession(ReadSession.newBuilder().setTable("projects/my-project/datasets/dataset/tables/table").setReadOptions(ReadSession.TableReadOptions.newBuilder().addSelectedFields("str").setRowRestriction("num > 1")).setDataFormat(DataFormat.AVRO)).setMaxStreamCount(10).build();
        ReadSession build2 = ReadSession.newBuilder().setName("readSessionName").setAvroSchema(AvroSchema.newBuilder().setSchema(TRIMMED_AVRO_SCHEMA_STRING)).addStreams(ReadStream.newBuilder().setName("streamName")).setDataFormat(DataFormat.AVRO).build();
        ReadRowsRequest build3 = ReadRowsRequest.newBuilder().setReadStream("streamName").build();
        List asList = Arrays.asList(createRecord("b", TRIMMED_AVRO_SCHEMA), createRecord("c", TRIMMED_AVRO_SCHEMA));
        List asList2 = Arrays.asList(createResponse(TRIMMED_AVRO_SCHEMA, asList.subList(0, 1), 0.0d, 0.5d), createResponse(TRIMMED_AVRO_SCHEMA, asList.subList(1, 2), 0.5d, 0.75d));
        BigQueryServices.StorageClient storageClient = (BigQueryServices.StorageClient) Mockito.mock(BigQueryServices.StorageClient.class, Mockito.withSettings().serializable());
        Mockito.when(storageClient.createReadSession(build)).thenReturn(build2);
        Mockito.when(storageClient.readRows(build3, "")).thenReturn(new FakeBigQueryServices.FakeBigQueryServerStream(asList2));
        BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransformConfiguration build4 = BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransformConfiguration.builder().setTableSpec(TABLE_SPEC).setSelectedFields(Arrays.asList("str")).setRowRestriction("num > 1").build();
        BigQueryDirectReadSchemaTransformProvider bigQueryDirectReadSchemaTransformProvider = new BigQueryDirectReadSchemaTransformProvider();
        BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransform from = bigQueryDirectReadSchemaTransformProvider.from(build4);
        PCollectionRowTuple empty = PCollectionRowTuple.empty(this.p);
        String str = (String) bigQueryDirectReadSchemaTransformProvider.outputCollectionNames().get(0);
        from.setBigQueryServices(this.fakeBigQueryServices.withStorageClient(storageClient));
        PCollectionRowTuple apply = empty.apply(from);
        Assert.assertTrue(apply.has(str));
        PAssert.that(apply.get(str)).containsInAnyOrder(TRIMMED_ROWS);
        this.p.run().waitUntilFinish();
    }
}
