package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import junit.framework.TestCase;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TestBigQuery;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.class */
public class BigQueryReadWriteIT implements Serializable {
    private static final Schema SOURCE_SCHEMA = Schema.builder().addNullableField("id", Schema.FieldType.INT64).addNullableField("name", Schema.FieldType.STRING).addNullableField("arr", Schema.FieldType.array(Schema.FieldType.STRING)).build();
    private static final Schema SOURCE_SCHEMA_TWO = Schema.builder().addNullableField("c_bigint", Schema.FieldType.INT64).addNullableField("c_tinyint", Schema.FieldType.BYTE).addNullableField("c_smallint", Schema.FieldType.INT16).addNullableField("c_integer", Schema.FieldType.INT32).addNullableField("c_float", Schema.FieldType.FLOAT).addNullableField("c_double", Schema.FieldType.DOUBLE).addNullableField("c_boolean", Schema.FieldType.BOOLEAN).addNullableField("c_timestamp", CalciteUtils.TIMESTAMP).addNullableField("c_varchar", Schema.FieldType.STRING).addNullableField("c_char", Schema.FieldType.STRING).addNullableField("c_arr", Schema.FieldType.array(Schema.FieldType.STRING)).build();

    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();

    @Rule
    public transient TestPipeline readPipeline = TestPipeline.create();

    @Rule
    public transient TestBigQuery bigQuery = TestBigQuery.create(SOURCE_SCHEMA);

    @Rule
    public transient TestBigQuery bigQueryTestingTypes = TestBigQuery.create(SOURCE_SCHEMA_TWO);

    @Test
    public void testSQLWrite() {
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new BigQueryTableProvider()});
        inMemory.executeDdl("CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "'");
        inMemory.parseQuery("INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])");
        BeamSqlRelUtils.toPCollection(this.pipeline, inMemory.parseQuery("INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])"));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(5L));
        this.bigQueryTestingTypes.assertThatAllRows(SOURCE_SCHEMA_TWO).now(Matchers.containsInAnyOrder(new Row[]{row(SOURCE_SCHEMA_TWO, Long.MAX_VALUE, Byte.MAX_VALUE, Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), Double.valueOf(1.0d), true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))}));
    }

    @Test
    public void testSQLRead() throws IOException {
        this.bigQueryTestingTypes.insertRows(SOURCE_SCHEMA_TWO, new Row[]{row(SOURCE_SCHEMA_TWO, Long.MAX_VALUE, Byte.MAX_VALUE, Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), Double.valueOf(1.0d), true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))});
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new BigQueryTableProvider()});
        inMemory.executeDdl("CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "'");
        PAssert.that(BeamSqlRelUtils.toPCollection(this.readPipeline, inMemory.parseQuery("SELECT * FROM TEST"))).containsInAnyOrder(new Row[]{row(SOURCE_SCHEMA_TWO, Long.MAX_VALUE, Byte.MAX_VALUE, Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), Double.valueOf(1.0d), true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))});
        MatcherAssert.assertThat(this.readPipeline.run().waitUntilFinish(Duration.standardMinutes(5L)), Matchers.equalTo(PipelineResult.State.DONE));
    }

    @Test
    public void testSQLWriteAndRead() {
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new BigQueryTableProvider()});
        inMemory.executeDdl("CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "'");
        inMemory.parseQuery("INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])");
        BeamSqlRelUtils.toPCollection(this.pipeline, inMemory.parseQuery("INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])"));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(5L));
        PAssert.that(BeamSqlRelUtils.toPCollection(this.readPipeline, inMemory.parseQuery("SELECT * FROM TEST"))).containsInAnyOrder(new Row[]{row(SOURCE_SCHEMA_TWO, Long.MAX_VALUE, Byte.MAX_VALUE, Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), Double.valueOf(1.0d), true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))});
        MatcherAssert.assertThat(this.readPipeline.run().waitUntilFinish(Duration.standardMinutes(5L)), Matchers.equalTo(PipelineResult.State.DONE));
    }

    @Test
    public void testSQLWriteAndRead_withExport() {
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new BigQueryTableProvider()});
        inMemory.executeDdl("CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "' \nTBLPROPERTIES '{ method: \"" + BigQueryIO.TypedRead.Method.EXPORT.toString() + "\" }'");
        inMemory.parseQuery("INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])");
        BeamSqlRelUtils.toPCollection(this.pipeline, inMemory.parseQuery("INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])"));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(5L));
        PAssert.that(BeamSqlRelUtils.toPCollection(this.readPipeline, inMemory.parseQuery("SELECT * FROM TEST"))).containsInAnyOrder(new Row[]{row(SOURCE_SCHEMA_TWO, Long.MAX_VALUE, Byte.MAX_VALUE, Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), Double.valueOf(1.0d), true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))});
        MatcherAssert.assertThat(this.readPipeline.run().waitUntilFinish(Duration.standardMinutes(5L)), Matchers.equalTo(PipelineResult.State.DONE));
    }

    @Test
    public void testSQLWriteAndRead_withDirectRead() {
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new BigQueryTableProvider()});
        inMemory.executeDdl("CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "' \nTBLPROPERTIES '{ method: \"" + BigQueryIO.TypedRead.Method.DIRECT_READ.toString() + "\" }'");
        inMemory.parseQuery("INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])");
        BeamSqlRelUtils.toPCollection(this.pipeline, inMemory.parseQuery("INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])"));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(5L));
        PAssert.that(BeamSqlRelUtils.toPCollection(this.readPipeline, inMemory.parseQuery("SELECT * FROM TEST"))).containsInAnyOrder(new Row[]{row(SOURCE_SCHEMA_TWO, Long.MAX_VALUE, Byte.MAX_VALUE, Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), Double.valueOf(1.0d), true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))});
        MatcherAssert.assertThat(this.readPipeline.run().waitUntilFinish(Duration.standardMinutes(5L)), Matchers.equalTo(PipelineResult.State.DONE));
    }

    @Test
    public void testSQLRead_withDirectRead_withProjectPushDown() {
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new BigQueryTableProvider()});
        inMemory.executeDdl("CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "' \nTBLPROPERTIES '{ method: \"" + BigQueryIO.TypedRead.Method.DIRECT_READ.toString() + "\" }'");
        inMemory.parseQuery("INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])");
        BeamSqlRelUtils.toPCollection(this.pipeline, inMemory.parseQuery("INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])"));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(5L));
        BeamRelNode parseQuery = inMemory.parseQuery("SELECT c_integer, c_varchar, c_tinyint FROM TEST");
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.readPipeline, parseQuery);
        MatcherAssert.assertThat(parseQuery, Matchers.instanceOf(BeamCalcRel.class));
        MatcherAssert.assertThat(parseQuery.getInput(0), Matchers.instanceOf(BeamIOSourceRel.class));
        MatcherAssert.assertThat(parseQuery.getInput(0).getRowType().getFieldNames(), Matchers.containsInAnyOrder(new String[]{"c_tinyint", "c_integer", "c_varchar"}));
        MatcherAssert.assertThat(pCollection.getSchema(), Matchers.equalTo(Schema.builder().addNullableField("c_integer", Schema.FieldType.INT32).addNullableField("c_varchar", Schema.FieldType.STRING).addNullableField("c_tinyint", Schema.FieldType.BYTE).build()));
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{row(pCollection.getSchema(), Integer.MAX_VALUE, "varchar", Byte.MAX_VALUE)});
        MatcherAssert.assertThat(this.readPipeline.run().waitUntilFinish(Duration.standardMinutes(5L)), Matchers.equalTo(PipelineResult.State.DONE));
    }

    @Test
    public void testSQLRead_withDirectRead_withProjectAndFilterPushDown() {
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new BigQueryTableProvider()});
        inMemory.executeDdl("CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "' \nTBLPROPERTIES '{ method: \"" + BigQueryIO.TypedRead.Method.DIRECT_READ.toString() + "\" }'");
        inMemory.parseQuery("INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])");
        BeamSqlRelUtils.toPCollection(this.pipeline, inMemory.parseQuery("INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])"));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(5L));
        BeamCalcRel parseQuery = inMemory.parseQuery("SELECT c_varchar, c_integer FROM TEST where c_tinyint=127");
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.readPipeline, parseQuery);
        MatcherAssert.assertThat(parseQuery, Matchers.instanceOf(BeamCalcRel.class));
        TestCase.assertNull(parseQuery.getProgram().getCondition());
        MatcherAssert.assertThat(parseQuery.getInput(0), Matchers.instanceOf(BeamIOSourceRel.class));
        MatcherAssert.assertThat(parseQuery.getInput(0).getRowType().getFieldNames(), Matchers.containsInAnyOrder(new String[]{"c_varchar", "c_integer"}));
        MatcherAssert.assertThat(pCollection.getSchema(), Matchers.equalTo(Schema.builder().addNullableField("c_varchar", Schema.FieldType.STRING).addNullableField("c_integer", Schema.FieldType.INT32).build()));
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{row(pCollection.getSchema(), "varchar", Integer.MAX_VALUE)});
        MatcherAssert.assertThat(this.readPipeline.run().waitUntilFinish(Duration.standardMinutes(5L)), Matchers.equalTo(PipelineResult.State.DONE));
    }

    @Test
    public void testSQLTypes() {
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new BigQueryTableProvider()});
        inMemory.executeDdl("CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n   c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_timestamp TIMESTAMP, \n   c_varchar VARCHAR, \n    c_char CHAR, \n   c_arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQueryTestingTypes.tableSpec() + "'");
        inMemory.parseQuery("INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])");
        BeamSqlRelUtils.toPCollection(this.pipeline, inMemory.parseQuery("INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, TIMESTAMP '2018-05-28 20:17:40.123', 'varchar', 'char', ARRAY['123', '456'])"));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(5L));
        MatcherAssert.assertThat(this.bigQueryTestingTypes.getFlatJsonRows(SOURCE_SCHEMA_TWO), Matchers.containsInAnyOrder(new Row[]{row(SOURCE_SCHEMA_TWO, Long.MAX_VALUE, Byte.MAX_VALUE, Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), Double.valueOf(1.0d), true, DateTimeUtils.parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", Arrays.asList("123", "456"))}));
    }

    @Test
    public void testInsertSelect() throws Exception {
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{readOnlyTableProvider(this.pipeline, "ORDERS_IN_MEMORY", row(SOURCE_SCHEMA, 1L, "foo", Arrays.asList("111", "aaa")), row(SOURCE_SCHEMA, 2L, "bar", Arrays.asList("222", "bbb")), row(SOURCE_SCHEMA, 3L, "baz", Arrays.asList("333", "ccc"))), new BigQueryTableProvider()});
        inMemory.executeDdl("CREATE EXTERNAL TABLE ORDERS_BQ( \n   id BIGINT, \n   name VARCHAR, \n    arr ARRAY<VARCHAR> \n) \nTYPE 'bigquery' \nLOCATION '" + this.bigQuery.tableSpec() + "'");
        BeamSqlRelUtils.toPCollection(this.pipeline, inMemory.parseQuery("INSERT INTO ORDERS_BQ \n SELECT \n    id as `id`, \n    name as `name`, \n    arr as `arr` \n FROM ORDERS_IN_MEMORY"));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(5L));
        MatcherAssert.assertThat(this.bigQuery.getFlatJsonRows(SOURCE_SCHEMA), Matchers.containsInAnyOrder(new Row[]{row(SOURCE_SCHEMA, 1L, "foo", Arrays.asList("111", "aaa")), row(SOURCE_SCHEMA, 2L, "bar", Arrays.asList("222", "bbb")), row(SOURCE_SCHEMA, 3L, "baz", Arrays.asList("333", "ccc"))}));
    }

    private TableProvider readOnlyTableProvider(Pipeline pipeline, String str, Row... rowArr) {
        return new ReadOnlyTableProvider("PCOLLECTION", ImmutableMap.of(str, new BeamPCollectionTable(createPCollection(pipeline, rowArr))));
    }

    private PCollection<Row> createPCollection(Pipeline pipeline, Row... rowArr) {
        return pipeline.apply(Create.of(Arrays.asList(rowArr)).withRowSchema(SOURCE_SCHEMA));
    }

    private Row row(Schema schema, Object... objArr) {
        return Row.withSchema(schema).addValues(objArr).build();
    }
}
