/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamPushDownIOSourceRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTableProvider;
import org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TestBigQuery;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_28_0.com.google.common.collect.ImmutableMap;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class BigQueryReadWriteIT
implements Serializable {
    private static final Schema SOURCE_SCHEMA = Schema.builder().addNullableField("id", Schema.FieldType.INT64).addNullableField("name", Schema.FieldType.STRING).addNullableField("arr", Schema.FieldType.array((Schema.FieldType)Schema.FieldType.STRING)).build();
    private static final Schema SOURCE_SCHEMA_TWO = Schema.builder().addNullableField("c_bigint", Schema.FieldType.INT64).addNullableField("c_tinyint", Schema.FieldType.BYTE).addNullableField("c_smallint", Schema.FieldType.INT16).addNullableField("c_integer", Schema.FieldType.INT32).addNullableField("c_float", Schema.FieldType.FLOAT).addNullableField("c_double", Schema.FieldType.DOUBLE).addNullableField("c_boolean", Schema.FieldType.BOOLEAN).addNullableField("c_timestamp", CalciteUtils.TIMESTAMP).addNullableField("c_varchar", Schema.FieldType.STRING).addNullableField("c_char", Schema.FieldType.STRING).addNullableField("c_arr", Schema.FieldType.array((Schema.FieldType)Schema.FieldType.STRING)).build();
    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();
    @Rule
    public transient TestPipeline readPipeline = TestPipeline.create();
    @Rule
    public transient TestBigQuery bigQuery = TestBigQuery.create((Schema)SOURCE_SCHEMA);
    @Rule
    public transient TestBigQuery bigQueryTestingTypes = TestBigQuery.create((Schema)SOURCE_SCHEMA_TWO);

    @Test
    public void testSQLWrite() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new BigQueryTableProvider()});
        String createTableStatement = "CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "'";
        sqlEnv.executeDdl(createTableStatement);
        String insertStatement = "INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])";
        sqlEnv.parseQuery(insertStatement);
        BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery(insertStatement));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        this.bigQueryTestingTypes.assertThatAllRows(SOURCE_SCHEMA_TWO).now(Matchers.containsInAnyOrder((Object[])new Row[]{this.row(SOURCE_SCHEMA_TWO, Long.MAX_VALUE, (byte)127, (short)Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), 1.0, true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))}));
    }

    @Test
    public void testSQLRead_withExport() throws IOException {
        this.bigQueryTestingTypes.insertRows(SOURCE_SCHEMA_TWO, new Row[]{this.row(SOURCE_SCHEMA_TWO, Long.MAX_VALUE, (byte)127, (short)Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), 1.0, true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))});
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new BigQueryTableProvider()});
        String createTableStatement = "CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "'TBLPROPERTIES '{ " + "method" + ": \"" + BigQueryIO.TypedRead.Method.EXPORT.toString() + "\" }'";
        sqlEnv.executeDdl(createTableStatement);
        String selectTableStatement = "SELECT * FROM TEST";
        PCollection output = BeamSqlRelUtils.toPCollection((Pipeline)this.readPipeline, (BeamRelNode)sqlEnv.parseQuery(selectTableStatement));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Row[]{this.row(SOURCE_SCHEMA_TWO, Long.MAX_VALUE, (byte)127, (short)Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), 1.0, true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))});
        PipelineResult.State state = this.readPipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        MatcherAssert.assertThat((Object)state, (Matcher)Matchers.equalTo((Object)PipelineResult.State.DONE));
    }

    @Test
    public void testSQLWriteAndRead() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new BigQueryTableProvider()});
        String createTableStatement = "CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "'";
        sqlEnv.executeDdl(createTableStatement);
        String insertStatement = "INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])";
        sqlEnv.parseQuery(insertStatement);
        BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery(insertStatement));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        String selectTableStatement = "SELECT * FROM TEST";
        PCollection output = BeamSqlRelUtils.toPCollection((Pipeline)this.readPipeline, (BeamRelNode)sqlEnv.parseQuery(selectTableStatement));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Row[]{this.row(SOURCE_SCHEMA_TWO, Long.MAX_VALUE, (byte)127, (short)Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), 1.0, true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))});
        PipelineResult.State state = this.readPipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        MatcherAssert.assertThat((Object)state, (Matcher)Matchers.equalTo((Object)PipelineResult.State.DONE));
    }

    @Test
    public void testSQLWriteAndRead_WithWriteDispositionEmpty() throws IOException {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new BigQueryTableProvider()});
        String createTableStatement = "CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "'TBLPROPERTIES '{ " + "writeDisposition" + ": \"" + BigQueryIO.Write.WriteDisposition.WRITE_EMPTY.toString() + "\" }'";
        sqlEnv.executeDdl(createTableStatement);
        String insertStatement = "INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])";
        sqlEnv.parseQuery(insertStatement);
        BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery(insertStatement));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        this.bigQueryTestingTypes.assertThatAllRows(SOURCE_SCHEMA_TWO).now(Matchers.containsInAnyOrder((Object[])new Row[]{this.row(SOURCE_SCHEMA_TWO, Long.MAX_VALUE, (byte)127, (short)Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), 1.0, true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))}));
    }

    @Test
    public void testSQLWriteAndRead_WithWriteDispositionTruncate() throws IOException {
        this.bigQueryTestingTypes.insertRows(SOURCE_SCHEMA_TWO, new Row[]{this.row(SOURCE_SCHEMA_TWO, 8223372036854775807L, (byte)0, (short)26892, 1462973245, Float.valueOf(2.0f), 2.0, true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))});
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new BigQueryTableProvider()});
        String createTableStatement = "CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "'TBLPROPERTIES '{ " + "writeDisposition" + ": \"" + BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE.toString() + "\" }'";
        sqlEnv.executeDdl(createTableStatement);
        String insertStatement = "INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])";
        sqlEnv.parseQuery(insertStatement);
        BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery(insertStatement));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        this.bigQueryTestingTypes.assertThatAllRows(SOURCE_SCHEMA_TWO).now(Matchers.containsInAnyOrder((Object[])new Row[]{this.row(SOURCE_SCHEMA_TWO, Long.MAX_VALUE, (byte)127, (short)Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), 1.0, true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))}));
    }

    @Test
    public void testSQLWriteAndRead_WithWriteDispositionAppend() throws IOException {
        this.bigQueryTestingTypes.insertRows(SOURCE_SCHEMA_TWO, new Row[]{this.row(SOURCE_SCHEMA_TWO, 8223372036854775807L, (byte)0, (short)26892, 1462973245, Float.valueOf(2.0f), 2.0, true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))});
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new BigQueryTableProvider()});
        String createTableStatement = "CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "' \nTBLPROPERTIES '{ " + "writeDisposition" + ": \"" + BigQueryIO.Write.WriteDisposition.WRITE_APPEND.toString() + "\" }'";
        sqlEnv.executeDdl(createTableStatement);
        String insertStatement = "INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])";
        sqlEnv.parseQuery(insertStatement);
        BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery(insertStatement));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        this.bigQueryTestingTypes.assertThatAllRows(SOURCE_SCHEMA_TWO).now(Matchers.containsInAnyOrder((Object[])new Row[]{this.row(SOURCE_SCHEMA_TWO, Long.MAX_VALUE, (byte)127, (short)Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), 1.0, true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456")), this.row(SOURCE_SCHEMA_TWO, 8223372036854775807L, (byte)0, (short)26892, 1462973245, Float.valueOf(2.0f), 2.0, true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))}));
    }

    @Test
    public void testSQLWriteAndRead_withExport() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new BigQueryTableProvider()});
        String createTableStatement = "CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "' \nTBLPROPERTIES '{ " + "method" + ": \"" + BigQueryIO.TypedRead.Method.EXPORT.toString() + "\" }'";
        sqlEnv.executeDdl(createTableStatement);
        String insertStatement = "INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])";
        sqlEnv.parseQuery(insertStatement);
        BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery(insertStatement));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        String selectTableStatement = "SELECT * FROM TEST";
        PCollection output = BeamSqlRelUtils.toPCollection((Pipeline)this.readPipeline, (BeamRelNode)sqlEnv.parseQuery(selectTableStatement));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Row[]{this.row(SOURCE_SCHEMA_TWO, Long.MAX_VALUE, (byte)127, (short)Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), 1.0, true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))});
        PipelineResult.State state = this.readPipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        MatcherAssert.assertThat((Object)state, (Matcher)Matchers.equalTo((Object)PipelineResult.State.DONE));
    }

    @Test
    public void testSQLWriteAndRead_withDirectRead() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new BigQueryTableProvider()});
        String createTableStatement = "CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "' \nTBLPROPERTIES '{ " + "method" + ": \"" + BigQueryIO.TypedRead.Method.DIRECT_READ.toString() + "\" }'";
        sqlEnv.executeDdl(createTableStatement);
        String insertStatement = "INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])";
        sqlEnv.parseQuery(insertStatement);
        BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery(insertStatement));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        String selectTableStatement = "SELECT * FROM TEST";
        PCollection output = BeamSqlRelUtils.toPCollection((Pipeline)this.readPipeline, (BeamRelNode)sqlEnv.parseQuery(selectTableStatement));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Row[]{this.row(SOURCE_SCHEMA_TWO, Long.MAX_VALUE, (byte)127, (short)Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), 1.0, true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))});
        PipelineResult.State state = this.readPipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        MatcherAssert.assertThat((Object)state, (Matcher)Matchers.equalTo((Object)PipelineResult.State.DONE));
    }

    @Test
    public void testSQLRead_withDirectRead_withProjectPushDown() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new BigQueryTableProvider()});
        String createTableStatement = "CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "' \nTBLPROPERTIES '{ " + "method" + ": \"" + BigQueryIO.TypedRead.Method.DIRECT_READ.toString() + "\" }'";
        sqlEnv.executeDdl(createTableStatement);
        String insertStatement = "INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])";
        sqlEnv.parseQuery(insertStatement);
        BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery(insertStatement));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        String selectTableStatement = "SELECT c_integer, c_varchar, c_tinyint FROM TEST";
        BeamRelNode relNode = sqlEnv.parseQuery(selectTableStatement);
        PCollection output = BeamSqlRelUtils.toPCollection((Pipeline)this.readPipeline, (BeamRelNode)relNode);
        MatcherAssert.assertThat((Object)relNode, (Matcher)Matchers.instanceOf(BeamPushDownIOSourceRel.class));
        MatcherAssert.assertThat((Object)relNode.getRowType().getFieldNames(), (Matcher)Matchers.containsInAnyOrder((Object[])new String[]{"c_tinyint", "c_integer", "c_varchar"}));
        MatcherAssert.assertThat((Object)output.getSchema(), (Matcher)Matchers.equalTo((Object)Schema.builder().addNullableField("c_integer", Schema.FieldType.INT32).addNullableField("c_varchar", Schema.FieldType.STRING).addNullableField("c_tinyint", Schema.FieldType.BYTE).build()));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Row[]{this.row(output.getSchema(), Integer.MAX_VALUE, "varchar", (byte)127)});
        PipelineResult.State state = this.readPipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        MatcherAssert.assertThat((Object)state, (Matcher)Matchers.equalTo((Object)PipelineResult.State.DONE));
    }

    @Test
    public void testSQLRead_withDirectRead_withProjectAndFilterPushDown() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new BigQueryTableProvider()});
        String createTableStatement = "CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "' \nTBLPROPERTIES '{ " + "method" + ": \"" + BigQueryIO.TypedRead.Method.DIRECT_READ.toString() + "\" }'";
        sqlEnv.executeDdl(createTableStatement);
        String insertStatement = "INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])";
        sqlEnv.parseQuery(insertStatement);
        BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery(insertStatement));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        String selectTableStatement = "SELECT c_varchar, c_integer FROM TEST where c_tinyint=127";
        BeamRelNode relNode = sqlEnv.parseQuery(selectTableStatement);
        PCollection output = BeamSqlRelUtils.toPCollection((Pipeline)this.readPipeline, (BeamRelNode)relNode);
        MatcherAssert.assertThat((Object)relNode, (Matcher)Matchers.instanceOf(BeamPushDownIOSourceRel.class));
        MatcherAssert.assertThat((Object)relNode.getRowType().getFieldNames(), (Matcher)Matchers.containsInAnyOrder((Object[])new String[]{"c_varchar", "c_integer"}));
        MatcherAssert.assertThat((Object)output.getSchema(), (Matcher)Matchers.equalTo((Object)Schema.builder().addNullableField("c_varchar", Schema.FieldType.STRING).addNullableField("c_integer", Schema.FieldType.INT32).build()));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Row[]{this.row(output.getSchema(), "varchar", Integer.MAX_VALUE)});
        PipelineResult.State state = this.readPipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        MatcherAssert.assertThat((Object)state, (Matcher)Matchers.equalTo((Object)PipelineResult.State.DONE));
    }

    @Test
    public void testSQLTypes() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new BigQueryTableProvider()});
        String createTableStatement = "CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "'";
        sqlEnv.executeDdl(createTableStatement);
        String insertStatement = "INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])";
        sqlEnv.parseQuery(insertStatement);
        BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery(insertStatement));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        MatcherAssert.assertThat((Object)this.bigQueryTestingTypes.getFlatJsonRows(SOURCE_SCHEMA_TWO), (Matcher)Matchers.containsInAnyOrder((Object[])new Row[]{this.row(SOURCE_SCHEMA_TWO, Long.MAX_VALUE, (byte)127, (short)Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), 1.0, true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))}));
    }

    @Test
    public void testInsertSelect() throws Exception {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{this.readOnlyTableProvider((Pipeline)this.pipeline, "ORDERS_IN_MEMORY", this.row(SOURCE_SCHEMA, 1L, "foo", Arrays.asList("111", "aaa")), this.row(SOURCE_SCHEMA, 2L, "bar", Arrays.asList("222", "bbb")), this.row(SOURCE_SCHEMA, 3L, "baz", Arrays.asList("333", "ccc"))), new BigQueryTableProvider()});
        String createTableStatement = "CREATE EXTERNAL TABLE ORDERS_BQ( \n   id BIGINT, \n   name VARCHAR, \n    arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQuery.tableSpec() + "'";
        sqlEnv.executeDdl(createTableStatement);
        String insertStatement = "INSERT INTO ORDERS_BQ \n SELECT \n    id as `id`, \n    name as `name`, \n    arr as `arr` \n FROM ORDERS_IN_MEMORY";
        BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery(insertStatement));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        List allJsonRows = this.bigQuery.getFlatJsonRows(SOURCE_SCHEMA);
        MatcherAssert.assertThat((Object)allJsonRows, (Matcher)Matchers.containsInAnyOrder((Object[])new Row[]{this.row(SOURCE_SCHEMA, 1L, "foo", Arrays.asList("111", "aaa")), this.row(SOURCE_SCHEMA, 2L, "bar", Arrays.asList("222", "bbb")), this.row(SOURCE_SCHEMA, 3L, "baz", Arrays.asList("333", "ccc"))}));
    }

    private TableProvider readOnlyTableProvider(Pipeline pipeline, String tableName, Row ... rows) {
        return new ReadOnlyTableProvider("PCOLLECTION", (Map)ImmutableMap.of((Object)tableName, (Object)new BeamPCollectionTable(this.createPCollection(pipeline, rows))));
    }

    private PCollection<Row> createPCollection(Pipeline pipeline, Row ... rows) {
        return (PCollection)pipeline.apply((PTransform)Create.of(Arrays.asList(rows)).withRowSchema(SOURCE_SCHEMA));
    }

    private Row row(Schema schema, Object ... values) {
        return Row.withSchema((Schema)schema).addValues(values).build();
    }
}

