package org.apache.beam.sdk.extensions.sql.zetasql;

import com.google.protobuf.ByteString;
import com.google.zetasql.StructType;
import com.google.zetasql.TypeFactory;
import com.google.zetasql.Value;
import com.google.zetasql.ZetaSQLType;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.class */
public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase {

    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    private PCollection<Row> execute(String str, QueryPlanner.QueryParameters queryParameters) {
        return BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel(str, queryParameters));
    }

    private PCollection<Row> execute(String str) {
        return execute(str, QueryPlanner.QueryParameters.ofNone());
    }

    private PCollection<Row> execute(String str, Map<String, Value> map) {
        return execute(str, QueryPlanner.QueryParameters.ofNamed(map));
    }

    private PCollection<Row> execute(String str, List<Value> list) {
        return execute(str, QueryPlanner.QueryParameters.ofPositional(list));
    }

    @Before
    public void setUp() {
        initialize();
    }

    @Test
    public void testSimpleSelect() {
        PAssert.that(execute("SELECT CAST (1243 as INT64), CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), CAST ('string' as STRING);")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").addDateTimeField("field2").addStringField("field3").build()).addValues(new Object[]{1243L, new DateTime(2018, 9, 15, 12, 59, 59, ISOChronology.getInstanceUTC()), "string"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testWithQueryPlannerClass() {
        PAssert.that(this.pipeline.apply(SqlTransform.query("SELECT CAST (1243 as INT64), CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), CAST ('string' as STRING);").withQueryPlannerClass(ZetaSQLQueryPlanner.class))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").addDateTimeField("field2").addStringField("field3").build()).addValues(new Object[]{1243L, new DateTime(2018, 9, 15, 12, 59, 59, ISOChronology.getInstanceUTC()), "string"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testPlannerNamePipelineOption() {
        this.pipeline.getOptions().as(BeamSqlPipelineOptions.class).setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner");
        PAssert.that(this.pipeline.apply(SqlTransform.query("SELECT CAST (1243 as INT64), CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), CAST ('string' as STRING);"))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").addDateTimeField("field2").addStringField("field3").build()).addValues(new Object[]{1243L, new DateTime(2018, 9, 15, 12, 59, 59, ISOChronology.getInstanceUTC()), "string"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testByteLiterals() {
        PAssert.that(execute("SELECT b'abc'")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("ColA", Schema.FieldType.BYTES).build()).addValues(new Object[]{new byte[]{97, 98, 99}}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testByteString() {
        PAssert.that(execute("SELECT @p0 IS NULL AS ColA", (Map<String, Value>) ImmutableMap.builder().put("p0", Value.createBytesValue(ByteString.copyFrom(new byte[]{98}))).build())).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("ColA", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{false}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testStringLiterals() {
        PAssert.that(execute("SELECT '\"America/Los_Angeles\"\\n'")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("ColA", Schema.FieldType.STRING).build()).addValues(new Object[]{"\"America/Los_Angeles\"\n"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testParameterString() {
        PAssert.that(execute("SELECT ?", (List<Value>) ImmutableList.of(Value.createStringValue("abc\n")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("ColA", Schema.FieldType.STRING).build()).addValues(new Object[]{"abc\n"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testEQ1() {
        PAssert.that(execute("SELECT @p0 = @p1 AS ColA", (Map<String, Value>) ImmutableMap.builder().put("p0", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_BOOL)).put("p1", Value.createBoolValue(true)).build())).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{(Boolean) null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testEQ2() {
        PAssert.that(execute("SELECT @p0 = @p1 AS ColA", (Map<String, Value>) ImmutableMap.builder().put("p0", Value.createDoubleValue(0.0d)).put("p1", Value.createDoubleValue(Double.POSITIVE_INFINITY)).build())).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addBooleanField("field1").build()).addValues(new Object[]{false}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testEQ3() {
        PAssert.that(execute("SELECT @p0 = @p1 AS ColA", (Map<String, Value>) ImmutableMap.builder().put("p0", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_DOUBLE)).put("p1", Value.createDoubleValue(3.14d)).build())).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{(Boolean) null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testEQ4() {
        PAssert.that(execute("SELECT @p0 = @p1 AS ColA", (Map<String, Value>) ImmutableMap.builder().put("p0", Value.createBytesValue(ByteString.copyFromUtf8("hello"))).put("p1", Value.createBytesValue(ByteString.copyFromUtf8("hello"))).build())).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{true}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testEQ5() {
        PAssert.that(execute("SELECT b'hello' = b'hello' AS ColA")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{true}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testEQ6() {
        PAssert.that(execute("SELECT ? = ? AS ColA", (List<Value>) ImmutableList.of(Value.createInt64Value(4L), Value.createInt64Value(5L)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{false}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testIn() {
        PAssert.that(execute("SELECT 'b' IN ('a', 'b', 'c')")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addBooleanField("f_bool").build()).addValues(new Object[]{true}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testInArray() {
        PAssert.that(execute("SELECT 'b' IN UNNEST(['a', 'b', 'c'])")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addBooleanField("f_bool").build()).addValues(new Object[]{true}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testIsNotNull1() {
        PAssert.that(execute("SELECT @p0 IS NOT NULL AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{false}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testIsNotNull2() {
        PAssert.that(execute("SELECT @p0 IS NOT NULL AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createNullValue(TypeFactory.createArrayType(TypeFactory.createSimpleType(ZetaSQLType.TypeKind.TYPE_INT64)))))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{false}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testIsNotNull3() {
        PAssert.that(execute("SELECT @p0 IS NOT NULL AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createNullValue(TypeFactory.createStructType(Arrays.asList(new StructType.StructField("a", TypeFactory.createSimpleType(ZetaSQLType.TypeKind.TYPE_STRING)))))))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{false}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testIfBasic() {
        PAssert.that(execute("SELECT IF(@p0, @p1, @p2) AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createBoolValue(true), "p1", Value.createInt64Value(1L), "p2", Value.createInt64Value(2L)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.INT64).build()).addValues(new Object[]{1L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testIfPositional() {
        PAssert.that(execute("SELECT IF(?, ?, ?) AS ColA", (List<Value>) ImmutableList.of(Value.createBoolValue(true), Value.createInt64Value(1L), Value.createInt64Value(2L)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.INT64).build()).addValues(new Object[]{1L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCoalesceBasic() {
        PAssert.that(execute("SELECT COALESCE(@p0, @p1, @p2) AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING), "p1", Value.createStringValue("yay"), "p2", Value.createStringValue("nay")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build()).addValues(new Object[]{"yay"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCoalesceSingleArgument() {
        PAssert.that(execute("SELECT COALESCE(@p0) AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_INT64)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.array(Schema.FieldType.INT64)).build()).addValue((Object) null).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCoalesceNullArray() {
        PAssert.that(execute("SELECT COALESCE(@p0, @p1) AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createNullValue(TypeFactory.createArrayType(TypeFactory.createSimpleType(ZetaSQLType.TypeKind.TYPE_INT64))), "p1", Value.createNullValue(TypeFactory.createArrayType(TypeFactory.createSimpleType(ZetaSQLType.TypeKind.TYPE_INT64)))))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.array(Schema.FieldType.INT64)).build()).addValue((Object) null).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testNullIfCoercion() {
        PAssert.that(execute("SELECT NULLIF(@p0, @p1) AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createInt64Value(3L), "p1", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_DOUBLE)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.DOUBLE).build()).addValue(Double.valueOf(3.0d)).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCoalesceNullStruct() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT COALESCE(NULL, STRUCT(\"a\" AS s, -33 AS i))"));
        Schema of = Schema.of(new Schema.Field[]{Schema.Field.of("s", Schema.FieldType.STRING), Schema.Field.of("i", Schema.FieldType.INT64)});
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.row(of)).build()).addValue(Row.withSchema(of).addValues(new Object[]{"a", -33L}).build()).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testIfTimestamp() {
        PAssert.that(execute("SELECT IF(@p0, @p1, @p2) AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createBoolValue(false), "p1", Value.createTimestampValueFromUnixMicros(0L), "p2", Value.createTimestampValueFromUnixMicros(DateTime.parse("2019-01-01T00:00:00Z").getMillis() * 1000)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.DATETIME).build()).addValues(new Object[]{DateTime.parse("2019-01-01T00:00:00Z")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    @Ignore("$make_array is not implemented")
    public void testMakeArray() {
        PAssert.that(BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT [s3, s1, s2] FROM (SELECT \"foo\" AS s1, \"bar\" AS s2, \"baz\" AS s3);"))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.array(Schema.FieldType.STRING)).build()).addValue(ImmutableList.of("baz", "foo", "bar")).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testNullIfPositive() {
        PAssert.that(execute("SELECT NULLIF(@p0, @p1) AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue("null"), "p1", Value.createStringValue("null")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build()).addValue((Object) null).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testNullIfNegative() {
        PAssert.that(execute("SELECT NULLIF(@p0, @p1) AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue("foo"), "p1", Value.createStringValue("null")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build()).addValues(new Object[]{"foo"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testIfNullPositive() {
        PAssert.that(execute("SELECT IFNULL(@p0, @p1) AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue("foo"), "p1", Value.createStringValue("default")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build()).addValues(new Object[]{"foo"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testIfNullNegative() {
        PAssert.that(execute("SELECT IFNULL(@p0, @p1) AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING), "p1", Value.createStringValue("yay")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build()).addValues(new Object[]{"yay"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testEmptyArrayParameter() {
        PAssert.that(execute("SELECT @p0 AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createArrayValue(TypeFactory.createArrayType(TypeFactory.createSimpleType(ZetaSQLType.TypeKind.TYPE_INT64)), ImmutableList.of())))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addArrayField("field1", Schema.FieldType.INT64).build()).addValue(ImmutableList.of()).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testEmptyArrayLiteral() {
        PAssert.that(execute("SELECT ARRAY<STRING>[];")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addArrayField("field1", Schema.FieldType.STRING).build()).addValue(ImmutableList.of()).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testLike1() {
        PAssert.that(execute("SELECT @p0 LIKE @p1 AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue("ab%"), "p1", Value.createStringValue("ab\\%")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{true}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testLikeNullPattern() {
        PAssert.that(execute("SELECT @p0 LIKE @p1 AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue("ab%"), "p1", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testLikeAllowsEscapingNonSpecialCharacter() {
        PAssert.that(execute("SELECT @p0 LIKE  @p1 AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue("ab"), "p1", Value.createStringValue("\\ab")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{true}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testLikeAllowsEscapingBackslash() {
        PAssert.that(execute("SELECT @p0 LIKE  @p1 AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue("a\\c"), "p1", Value.createStringValue("a\\\\c")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{true}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testLikeBytes() {
        PAssert.that(execute("SELECT @p0 LIKE  @p1 AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createBytesValue(ByteString.copyFromUtf8("abcd")), "p1", Value.createBytesValue(ByteString.copyFromUtf8("__%"))))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{true}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSimpleUnionAll() {
        PCollection<Row> execute = execute("SELECT CAST (1243 as INT64), CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), CAST ('string' as STRING)  UNION ALL  SELECT CAST (1243 as INT64), CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), CAST ('string' as STRING);");
        Schema build = Schema.builder().addInt64Field("field1").addDateTimeField("field2").addStringField("field3").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1243L, new DateTime(2018, 9, 15, 12, 59, 59, ISOChronology.getInstanceUTC()), "string"}).build(), Row.withSchema(build).addValues(new Object[]{1243L, new DateTime(2018, 9, 15, 12, 59, 59, ISOChronology.getInstanceUTC()), "string"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testThreeWayUnionAll() {
        PCollection<Row> execute = execute("SELECT a FROM (SELECT 1 a UNION ALL SELECT 2 UNION ALL SELECT 3)");
        Schema build = Schema.builder().addInt64Field("field1").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L}).build(), Row.withSchema(build).addValues(new Object[]{2L}).build(), Row.withSchema(build).addValues(new Object[]{3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSimpleUnionDISTINCT() {
        PAssert.that(execute("SELECT CAST (1243 as INT64), CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), CAST ('string' as STRING)  UNION DISTINCT  SELECT CAST (1243 as INT64), CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), CAST ('string' as STRING);")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").addDateTimeField("field2").addStringField("field3").build()).addValues(new Object[]{1243L, new DateTime(2018, 9, 15, 12, 59, 59, ISOChronology.getInstanceUTC()), "string"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLInnerJoin() {
        PAssert.that(execute("SELECT t1.Key FROM KeyValue AS t1 INNER JOIN BigTable AS t2 on  t1.Key = t2.RowKey AND t1.ts = t2.ts")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLInnerJoinWithUsing() {
        PAssert.that(execute("SELECT t1.Key FROM KeyValue AS t1 INNER JOIN BigTable AS t2 USING(ts)")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLInnerJoinTwo() {
        PAssert.that(execute("SELECT t2.RowKey FROM KeyValue AS t1 INNER JOIN BigTable AS t2 on  t2.RowKey = t1.Key AND t2.ts = t1.ts")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLLeftOuterJoin() {
        PAssert.that(execute("SELECT * FROM KeyValue AS t1 LEFT JOIN BigTable AS t2 on  t1.Key = t2.RowKey")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").addStringField("field2").addDateTimeField("field3").addNullableField("field4", Schema.FieldType.INT64).addNullableField("field5", Schema.FieldType.STRING).addNullableField("field6", Schema.FieldType.DATETIME).build()).addValues(new Object[]{14L, "KeyValue234", new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()), null, null, null}).build(), Row.withSchema(Schema.builder().addInt64Field("field1").addStringField("field2").addDateTimeField("field3").addInt64Field("field4").addStringField("field5").addDateTimeField("field6").build()).addValues(new Object[]{15L, "KeyValue235", new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()), 15L, "BigTable235", new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC())}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLRightOuterJoin() {
        PAssert.that(execute("SELECT * FROM KeyValue AS t1 RIGHT JOIN BigTable AS t2 on  t1.Key = t2.RowKey")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.INT64).addNullableField("field2", Schema.FieldType.STRING).addNullableField("field3", Schema.FieldType.DATETIME).addInt64Field("field4").addStringField("field5").addDateTimeField("field6").build()).addValues(new Object[]{null, null, null, 16L, "BigTable236", new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC())}).build(), Row.withSchema(Schema.builder().addInt64Field("field1").addStringField("field2").addDateTimeField("field3").addInt64Field("field4").addStringField("field5").addDateTimeField("field6").build()).addValues(new Object[]{15L, "KeyValue235", new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()), 15L, "BigTable235", new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC())}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLFullOuterJoin() {
        PAssert.that(execute("SELECT * FROM KeyValue AS t1 FULL JOIN BigTable AS t2 on  t1.Key = t2.RowKey")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.INT64).addNullableField("field2", Schema.FieldType.STRING).addNullableField("field3", Schema.FieldType.DATETIME).addInt64Field("field4").addStringField("field5").addDateTimeField("field6").build()).addValues(new Object[]{null, null, null, 16L, "BigTable236", new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC())}).build(), Row.withSchema(Schema.builder().addInt64Field("field1").addStringField("field2").addDateTimeField("field3").addInt64Field("field4").addStringField("field5").addDateTimeField("field6").build()).addValues(new Object[]{15L, "KeyValue235", new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()), 15L, "BigTable235", new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC())}).build(), Row.withSchema(Schema.builder().addInt64Field("field1").addStringField("field2").addDateTimeField("field3").addNullableField("field4", Schema.FieldType.INT64).addNullableField("field5", Schema.FieldType.STRING).addNullableField("field6", Schema.FieldType.DATETIME).build()).addValues(new Object[]{14L, "KeyValue234", new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()), null, null, null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    @Ignore("BeamSQL only supports equal join")
    public void testZetaSQLFullOuterJoinTwo() {
        BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT * FROM KeyValue AS t1 FULL JOIN BigTable AS t2 on  t1.Key + t2.RowKey = 30"));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLFullOuterJoinFalse() {
        BeamRelNode convertToBeamRel = new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT * FROM KeyValue AS t1 FULL JOIN BigTable AS t2 ON false");
        this.thrown.expect(UnsupportedOperationException.class);
        BeamSqlRelUtils.toPCollection(this.pipeline, convertToBeamRel);
    }

    @Test
    public void testZetaSQLThreeWayInnerJoin() {
        PAssert.that(execute("SELECT t3.Value, t2.Value, t1.Value, t1.Key, t3.ColId FROM KeyValue as t1 JOIN BigTable as t2 ON (t1.Key = t2.RowKey) JOIN Spanner as t3 ON (t3.ColId = t1.Key)")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("t3.Value").addStringField("t2.Value").addStringField("t1.Value").addInt64Field("t1.Key").addInt64Field("t3.ColId").build()).addValues(new Object[]{"Spanner235", "BigTable235", "KeyValue235", 15L, 15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLTableJoinOnItselfWithFiltering() {
        PAssert.that(execute("SELECT * FROM Spanner as t1 JOIN Spanner as t2 ON (t1.ColId = t2.ColId) WHERE t1.ColId = 17")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").addStringField("field2").addInt64Field("field3").addStringField("field4").build()).addValues(new Object[]{17L, "Spanner237", 17L, "Spanner237"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLSelectFromSelect() {
        PCollection<Row> execute = execute("SELECT * FROM (SELECT \"apple\" AS fruit, \"carrot\" AS vegetable);");
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").addStringField("field2").build()).addValues(new Object[]{"apple", "carrot"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
        Schema schema = execute.getSchema();
        Assert.assertEquals(2L, schema.getFieldCount());
        Assert.assertEquals("fruit", schema.getField(0).getName());
        Assert.assertEquals("vegetable", schema.getField(1).getName());
    }

    @Test
    public void testZetaSQLSelectFromTable() {
        PCollection<Row> execute = execute("SELECT Key, Value FROM KeyValue;");
        Schema build = Schema.builder().addInt64Field("field1").addStringField("field2").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{14L, "KeyValue234"}).build(), Row.withSchema(build).addValues(new Object[]{15L, "KeyValue235"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLSelectFromTableLimit() {
        PCollection<Row> execute = execute("SELECT Key, Value FROM KeyValue LIMIT 2;");
        Schema build = Schema.builder().addInt64Field("field1").addStringField("field2").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{14L, "KeyValue234"}).build(), Row.withSchema(build).addValues(new Object[]{15L, "KeyValue235"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLSelectFromTableLimit0() {
        PAssert.that(execute("SELECT Key, Value FROM KeyValue LIMIT 0;")).empty();
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLSelectNullLimitParam() {
        ImmutableMap of = ImmutableMap.of("lmt", Value.createNullValue(TypeFactory.createSimpleType(ZetaSQLType.TypeKind.TYPE_INT64)));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("Limit requires non-null count and offset");
        zetaSQLQueryPlanner.convertToBeamRel("SELECT Key, Value FROM KeyValue LIMIT @lmt;", of);
    }

    @Test
    public void testZetaSQLSelectNullOffsetParam() {
        ImmutableMap of = ImmutableMap.of("lmt", Value.createNullValue(TypeFactory.createSimpleType(ZetaSQLType.TypeKind.TYPE_INT64)));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("Limit requires non-null count and offset");
        zetaSQLQueryPlanner.convertToBeamRel("SELECT Key, Value FROM KeyValue LIMIT 1 OFFSET @lmt;", of);
    }

    @Test
    public void testZetaSQLSelectFromTableOrderLimit() {
        PAssert.that(execute("SELECT x, y FROM (SELECT 1 as x, 0 as y UNION ALL SELECT 0, 0 UNION ALL SELECT 1, 0 UNION ALL SELECT 1, 1) ORDER BY x LIMIT 1")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").addInt64Field("field2").build()).addValues(new Object[]{0L, 0L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLSelectFromTableLimitOffset() {
        PAssert.that(execute("SELECT COUNT(a) FROM (\nSELECT a FROM (SELECT 1 a UNION ALL SELECT 2 UNION ALL SELECT 3) LIMIT 3 OFFSET 1);")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{2L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLSelectFromTableOrderByLimit() {
        PCollection<Row> execute = execute("SELECT Key, Value FROM KeyValue ORDER BY Key DESC LIMIT 2;");
        Schema build = Schema.builder().addInt64Field("field1").addStringField("field2").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{14L, "KeyValue234"}).build(), Row.withSchema(build).addValues(new Object[]{15L, "KeyValue235"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLSelectFromTableOrderByNoSelectLimit() {
        PCollection<Row> execute = execute("SELECT Value FROM KeyValue ORDER BY Key DESC LIMIT 2;");
        Schema build = Schema.builder().addStringField("field2").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{"KeyValue234"}).build(), Row.withSchema(build).addValues(new Object[]{"KeyValue235"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLSelectFromTableOrderBy() {
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("ORDER BY without a LIMIT is not supported.");
        zetaSQLQueryPlanner.convertToBeamRel("SELECT Key, Value FROM KeyValue ORDER BY Key DESC;");
    }

    @Test
    public void testZetaSQLSelectFromTableWithStructType2() {
        PAssert.that(execute("SELECT table_with_struct.struct_col.struct_col_str FROM table_with_struct WHERE id = 1;")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field").build()).addValue("row_one").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLStructFieldAccessInFilter() {
        PAssert.that(execute("SELECT table_with_struct.id FROM table_with_struct WHERE table_with_struct.struct_col.struct_col_str = 'row_one';")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field").build()).addValue(1L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLStructFieldAccessInCast() {
        PAssert.that(execute("SELECT CAST(table_with_struct.id AS STRING) FROM table_with_struct WHERE table_with_struct.struct_col.struct_col_str = 'row_one';")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field").build()).addValue("1").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    @Ignore("[https://github.com/apache/beam/issues/20101] CAST operator does not work fully due to bugs in unparsing")
    public void testZetaSQLStructFieldAccessInCast2() {
        PAssert.that(execute("SELECT CAST(A.struct_col.struct_col_str AS TIMESTAMP) FROM table_with_struct_ts_string AS A")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addDateTimeField("field").build()).addValue(DateTimeUtils.parseTimestampWithUTCTimeZone("2019-01-15 13:21:03")).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testAggregateWithAndWithoutColumnRefs() {
        PAssert.that(BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT \n  id, \n  SUM(has_f1) as f1_count, \n  SUM(has_f2) as f2_count, \n  SUM(has_f3) as f3_count, \n  SUM(has_f4) as f4_count, \n  SUM(has_f5) as f5_count, \n  COUNT(*) as count, \n  SUM(has_f6) as f6_count  \nFROM (select 0 as id, 1 as has_f1, 2 as has_f2, 3 as has_f3, 4 as has_f4, 5 as has_f5, 6 as has_f6)\nGROUP BY id"))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("id").addInt64Field("f1_count").addInt64Field("f2_count").addInt64Field("f3_count").addInt64Field("f4_count").addInt64Field("f5_count").addInt64Field("count").addInt64Field("f6_count").build()).addValues(new Object[]{0L, 1L, 2L, 3L, 4L, 5L, 1L, 6L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLStructFieldAccessInGroupBy() {
        PCollection<Row> execute = execute("SELECT rowCol.row_id, COUNT(*) FROM table_with_struct_two GROUP BY rowCol.row_id");
        Schema build = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L, 1L}).build(), Row.withSchema(build).addValues(new Object[]{2L, 1L}).build(), Row.withSchema(build).addValues(new Object[]{3L, 2L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLAnyValueInGroupBy() {
        PCollection<Row> execute = execute("SELECT rowCol.row_id as key, ANY_VALUE(rowCol.data) as any_value FROM table_with_struct_two GROUP BY rowCol.row_id");
        HashMap hashMap = new HashMap();
        hashMap.put(1L, Arrays.asList("data1"));
        hashMap.put(2L, Arrays.asList("data2"));
        hashMap.put(3L, Arrays.asList("data2", "data3"));
        PAssert.that(execute).satisfies(iterable -> {
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                Row row = (Row) it.next();
                List list = (List) hashMap.remove(row.getInt64("key"));
                Assert.assertTrue(list != null);
                Assert.assertTrue(list.contains(row.getString("any_value")));
            }
            Assert.assertTrue(hashMap.isEmpty());
            return null;
        });
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLStructFieldAccessInGroupBy2() {
        PCollection<Row> execute = execute("SELECT rowCol.data, MAX(rowCol.row_id), MIN(rowCol.row_id) FROM table_with_struct_two GROUP BY rowCol.data");
        Schema build = Schema.builder().addStringField("field1").addInt64Field("field2").addInt64Field("field3").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{"data1", 1L, 1L}).build(), Row.withSchema(build).addValues(new Object[]{"data2", 3L, 2L}).build(), Row.withSchema(build).addValues(new Object[]{"data3", 3L, 3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLStructFieldAccessInnerJoin() {
        PCollection<Row> execute = execute("SELECT A.rowCol.data FROM table_with_struct_two AS A INNER JOIN table_with_struct AS B ON A.rowCol.row_id = B.id");
        Schema build = Schema.builder().addStringField("field1").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValue("data1").build(), Row.withSchema(build).addValue("data2").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLSelectFromTableWithArrayType() {
        PCollection<Row> execute = execute("SELECT array_col FROM table_with_array;");
        Schema build = Schema.builder().addArrayField("field", Schema.FieldType.STRING).build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValue(Arrays.asList("1", "2", "3")).build(), Row.withSchema(build).addValue(ImmutableList.of()).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLSelectStarFromTable() {
        PCollection<Row> execute = execute("SELECT * FROM BigTable;");
        Schema build = Schema.builder().addInt64Field("field1").addStringField("field2").addDateTimeField("field3").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{15L, "BigTable235", new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC())}).build(), Row.withSchema(build).addValues(new Object[]{16L, "BigTable236", new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC())}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLBasicFiltering() {
        PAssert.that(execute("SELECT Key, Value FROM KeyValue WHERE Key = 14;")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").addStringField("field2").build()).addValues(new Object[]{14L, "KeyValue234"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLBasicFilteringTwo() {
        PAssert.that(execute("SELECT Key, Value FROM KeyValue WHERE Key = 14 AND Value = 'non-existing';")).empty();
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLBasicFilteringThree() {
        PCollection<Row> execute = execute("SELECT Key, Value FROM KeyValue WHERE Key = 14 OR Key = 15;");
        Schema build = Schema.builder().addInt64Field("field1").addStringField("field2").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{14L, "KeyValue234"}).build(), Row.withSchema(build).addValues(new Object[]{15L, "KeyValue235"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLCountOnAColumn() {
        PAssert.that(execute("SELECT COUNT(Key) FROM KeyValue")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{2L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLAggDistinct() {
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("Does not support COUNT DISTINCT");
        zetaSQLQueryPlanner.convertToBeamRel("SELECT Key, COUNT(DISTINCT Value) FROM KeyValue GROUP BY Key");
    }

    @Test
    public void testZetaSQLBasicAgg() {
        PCollection<Row> execute = execute("SELECT Key, COUNT(*) FROM KeyValue GROUP BY Key");
        Schema build = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{14L, 1L}).build(), Row.withSchema(build).addValues(new Object[]{15L, 1L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLColumnAlias1() {
        PCollection<Row> execute = execute("SELECT Key, COUNT(*) AS count_col FROM KeyValue GROUP BY Key");
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
        Schema schema = execute.getSchema();
        Assert.assertEquals(2L, schema.getFieldCount());
        Assert.assertEquals("Key", schema.getField(0).getName());
        Assert.assertEquals("count_col", schema.getField(1).getName());
    }

    @Test
    public void testZetaSQLColumnAlias2() {
        PCollection<Row> execute = execute("SELECT Key AS k1, (count_col + 1) AS k2 FROM (SELECT Key, COUNT(*) AS count_col FROM KeyValue GROUP BY Key)");
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
        Schema schema = execute.getSchema();
        Assert.assertEquals(2L, schema.getFieldCount());
        Assert.assertEquals("k1", schema.getField(0).getName());
        Assert.assertEquals("k2", schema.getField(1).getName());
    }

    @Test
    public void testZetaSQLColumnAlias3() {
        PCollection<Row> execute = execute("SELECT Key AS v1, Value AS v2, ts AS v3 FROM KeyValue");
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
        Schema schema = execute.getSchema();
        Assert.assertEquals(3L, schema.getFieldCount());
        Assert.assertEquals("v1", schema.getField(0).getName());
        Assert.assertEquals("v2", schema.getField(1).getName());
        Assert.assertEquals("v3", schema.getField(2).getName());
    }

    @Test
    public void testZetaSQLColumnAlias4() {
        PCollection<Row> execute = execute("SELECT CAST(123 AS INT64) AS cast_col");
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
        Schema schema = execute.getSchema();
        Assert.assertEquals(1L, schema.getFieldCount());
        Assert.assertEquals("cast_col", schema.getField(0).getName());
    }

    @Test
    public void testZetaSQLAmbiguousAlias() {
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expectMessage("Name ID in GROUP BY clause is ambiguous; it may refer to multiple columns in the SELECT-list [at 1:68]");
        zetaSQLQueryPlanner.convertToBeamRel("SELECT row_id as ID, int64_col as ID FROM table_all_types GROUP BY ID;");
    }

    @Test
    public void testZetaSQLAggWithOrdinalReference() {
        PCollection<Row> execute = execute("SELECT Key, COUNT(*) FROM aggregate_test_table GROUP BY 1");
        Schema build = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L, 2L}).build(), Row.withSchema(build).addValues(new Object[]{2L, 3L}).build(), Row.withSchema(build).addValues(new Object[]{3L, 2L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLAggWithAliasReference() {
        PCollection<Row> execute = execute("SELECT Key AS K, COUNT(*) FROM aggregate_test_table GROUP BY K");
        Schema build = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L, 2L}).build(), Row.withSchema(build).addValues(new Object[]{2L, 3L}).build(), Row.withSchema(build).addValues(new Object[]{3L, 2L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLBasicAgg2() {
        PCollection<Row> execute = execute("SELECT Key, COUNT(*) FROM aggregate_test_table GROUP BY Key");
        Schema build = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L, 2L}).build(), Row.withSchema(build).addValues(new Object[]{2L, 3L}).build(), Row.withSchema(build).addValues(new Object[]{3L, 2L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLBasicAgg3() {
        PCollection<Row> execute = execute("SELECT Key, Key2, COUNT(*) FROM aggregate_test_table GROUP BY Key2, Key");
        Schema build = Schema.builder().addInt64Field("field1").addInt64Field("field3").addInt64Field("field2").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L, 10L, 1L}).build(), Row.withSchema(build).addValues(new Object[]{1L, 11L, 1L}).build(), Row.withSchema(build).addValues(new Object[]{2L, 11L, 2L}).build(), Row.withSchema(build).addValues(new Object[]{2L, 12L, 1L}).build(), Row.withSchema(build).addValues(new Object[]{3L, 13L, 2L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLBasicAgg4() {
        PCollection<Row> execute = execute("SELECT Key, Key2, MAX(f_int_1), MIN(f_int_1), SUM(f_int_1), SUM(f_double_1) FROM aggregate_test_table GROUP BY Key2, Key");
        Schema build = Schema.builder().addInt64Field("field1").addInt64Field("field3").addInt64Field("field2").addInt64Field("field4").addInt64Field("field5").addDoubleField("field6").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L, 10L, 1L, 1L, 1L, Double.valueOf(1.0d)}).build(), Row.withSchema(build).addValues(new Object[]{1L, 11L, 2L, 2L, 2L, Double.valueOf(2.0d)}).build(), Row.withSchema(build).addValues(new Object[]{2L, 11L, 4L, 3L, 7L, Double.valueOf(7.0d)}).build(), Row.withSchema(build).addValues(new Object[]{2L, 12L, 5L, 5L, 5L, Double.valueOf(5.0d)}).build(), Row.withSchema(build).addValues(new Object[]{3L, 13L, 7L, 6L, 13L, Double.valueOf(13.0d)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLBasicAgg5() {
        PCollection<Row> execute = execute("SELECT Key, Key2, AVG(CAST(f_int_1 AS FLOAT64)), AVG(f_double_1) FROM aggregate_test_table GROUP BY Key2, Key");
        Schema build = Schema.builder().addInt64Field("field1").addInt64Field("field2").addDoubleField("field3").addDoubleField("field4").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L, 10L, Double.valueOf(1.0d), Double.valueOf(1.0d)}).build(), Row.withSchema(build).addValues(new Object[]{1L, 11L, Double.valueOf(2.0d), Double.valueOf(2.0d)}).build(), Row.withSchema(build).addValues(new Object[]{2L, 11L, Double.valueOf(3.5d), Double.valueOf(3.5d)}).build(), Row.withSchema(build).addValues(new Object[]{2L, 12L, Double.valueOf(5.0d), Double.valueOf(5.0d)}).build(), Row.withSchema(build).addValues(new Object[]{3L, 13L, Double.valueOf(6.5d), Double.valueOf(6.5d)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    @Ignore("Calcite infers return type of AVG(int64) as BIGINT while ZetaSQL requires it as either NUMERIC or DOUBLE/FLOAT64")
    public void testZetaSQLTestAVG() {
        PCollection<Row> execute = execute("SELECT Key, AVG(f_int_1)FROM aggregate_test_table GROUP BY Key");
        Schema build = Schema.builder().addInt64Field("field1").addInt64Field("field2").addInt64Field("field3").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L, 10L, 1L}).build(), Row.withSchema(build).addValues(new Object[]{1L, 11L, 6L}).build(), Row.withSchema(build).addValues(new Object[]{2L, 11L, 6L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLGroupByExprInSelect() {
        PCollection<Row> execute = execute("SELECT int64_col + 1 FROM table_all_types GROUP BY int64_col + 1;");
        Schema build = Schema.builder().addInt64Field("field").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValue(0L).build(), Row.withSchema(build).addValue(-1L).build(), Row.withSchema(build).addValue(-2L).build(), Row.withSchema(build).addValue(-3L).build(), Row.withSchema(build).addValue(-4L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLGroupByAndFiltering() {
        PAssert.that(execute("SELECT int64_col FROM table_all_types WHERE int64_col = 1 GROUP BY int64_col;")).empty();
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLGroupByAndFilteringOnNonGroupByColumn() {
        PCollection<Row> execute = execute("SELECT int64_col FROM table_all_types WHERE double_col = 0.5 GROUP BY int64_col;");
        Schema build = Schema.builder().addInt64Field("field").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValue(-5L).build(), Row.withSchema(build).addValue(-4L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLBasicHaving() {
        PAssert.that(execute("SELECT Key, COUNT(*) FROM aggregate_test_table GROUP BY Key HAVING COUNT(*) > 2")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").addInt64Field("field2").build()).addValues(new Object[]{2L, 3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLHavingNull() {
        PCollection<Row> execute = execute("SELECT SUM(int64_val) FROM all_null_table GROUP BY primary_key HAVING false");
        Schema.builder().addInt64Field("field").build();
        PAssert.that(execute).empty();
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLBasicFixedWindowing() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT COUNT(*) as field_count, TUMBLE_START(\"INTERVAL 1 SECOND\") as window_start, TUMBLE_END(\"INTERVAL 1 SECOND\") as window_end FROM KeyValue GROUP BY TUMBLE(ts, \"INTERVAL 1 SECOND\");"));
        Schema build = Schema.builder().addInt64Field("count_start").addDateTimeField("field1").addDateTimeField("field2").build();
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L, new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC())}).build(), Row.withSchema(build).addValues(new Object[]{1L, new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC())}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLNestedQueryOne() {
        PCollection<Row> execute = execute("SELECT a.Value, a.Key FROM (SELECT Key, Value FROM KeyValue WHERE Key = 14 OR Key = 15) as a;");
        Schema build = Schema.builder().addStringField("field2").addInt64Field("field1").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{"KeyValue234", 14L}).build(), Row.withSchema(build).addValues(new Object[]{"KeyValue235", 15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLNestedQueryTwo() {
        PCollection<Row> execute = execute("SELECT a.Key, a.Key2, COUNT(*) FROM  (SELECT * FROM aggregate_test_table WHERE Key != 10) as a  GROUP BY a.Key2, a.Key");
        Schema build = Schema.builder().addInt64Field("field1").addInt64Field("field3").addInt64Field("field2").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L, 10L, 1L}).build(), Row.withSchema(build).addValues(new Object[]{1L, 11L, 1L}).build(), Row.withSchema(build).addValues(new Object[]{2L, 11L, 2L}).build(), Row.withSchema(build).addValues(new Object[]{2L, 12L, 1L}).build(), Row.withSchema(build).addValues(new Object[]{3L, 13L, 2L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLNestedQueryThree() {
        PAssert.that(execute("SELECT * FROM (SELECT * FROM KeyValue) AS t1 INNER JOIN (SELECT * FROM BigTable) AS t2 on t1.Key = t2.RowKey")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("Key").addStringField("Value").addDateTimeField("ts").addInt64Field("RowKey").addStringField("Value2").addDateTimeField("ts2").build()).addValues(new Object[]{15L, "KeyValue235", new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()), 15L, "BigTable235", new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC())}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLNestedQueryFive() {
        PCollection<Row> execute = execute("SELECT a.Value, a.Key FROM (SELECT Value, Key FROM KeyValue WHERE Key = 14 OR Key = 15) as a;");
        Schema build = Schema.builder().addStringField("field2").addInt64Field("field1").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{"KeyValue234", 14L}).build(), Row.withSchema(build).addValues(new Object[]{"KeyValue235", 15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testMultipleSelectStatementsThrowsException() {
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage("No additional statements are allowed after a SELECT statement.");
        zetaSQLQueryPlanner.convertToBeamRel("SELECT 1; SELECT 2;");
    }

    @Test
    public void testDistinct() {
        PCollection<Row> execute = execute("SELECT DISTINCT Key2 FROM aggregate_test_table");
        Schema build = Schema.builder().addInt64Field("Key2").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{10L}).build(), Row.withSchema(build).addValues(new Object[]{11L}).build(), Row.withSchema(build).addValues(new Object[]{12L}).build(), Row.withSchema(build).addValues(new Object[]{13L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testDistinctOnNull() {
        PAssert.that(execute("SELECT DISTINCT str_val FROM all_null_table")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("str_val", Schema.FieldType.DOUBLE).build()).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testAnyValue() {
        PAssert.that(execute("SELECT ANY_VALUE(double_val) FROM all_null_table")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("double_val", Schema.FieldType.DOUBLE).build()).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSelectNULL() {
        PAssert.that(execute("SELECT NULL")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("long_val", Schema.FieldType.INT64).build()).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testWithQueryOne() {
        PAssert.that(execute("With T1 AS (SELECT * FROM KeyValue), T2 AS (SELECT * FROM BigTable) SELECT T2.RowKey FROM T1 INNER JOIN T2 on T1.Key = T2.RowKey;")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testWithQueryTwo() {
        PCollection<Row> execute = execute("WITH T1 AS (SELECT Key, COUNT(*) as value FROM KeyValue GROUP BY Key) SELECT T1.Key, T1.value FROM T1");
        Schema build = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{14L, 1L}).build(), Row.withSchema(build).addValues(new Object[]{15L, 1L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testWithQueryThree() {
        PCollection<Row> execute = execute("WITH T1 as (SELECT Value, Key FROM KeyValue WHERE Key = 14 OR Key = 15) SELECT T1.Value, T1.Key FROM T1;");
        Schema build = Schema.builder().addStringField("field1").addInt64Field("field2").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{"KeyValue234", 14L}).build(), Row.withSchema(build).addValues(new Object[]{"KeyValue235", 15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testWithQueryFour() {
        PCollection<Row> execute = execute("WITH T1 as (SELECT Value, Key FROM KeyValue) SELECT T1.Value, T1.Key FROM T1 WHERE T1.Key = 14 OR T1.Key = 15;");
        Schema build = Schema.builder().addStringField("field2").addInt64Field("field1").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{"KeyValue234", 14L}).build(), Row.withSchema(build).addValues(new Object[]{"KeyValue235", 15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testWithQueryFive() {
        PCollection<Row> execute = execute("WITH T1 AS (SELECT * FROM KeyValue) SELECT T1.Key, COUNT(*) FROM T1 GROUP BY T1.Key");
        Schema build = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{14L, 1L}).build(), Row.withSchema(build).addValues(new Object[]{15L, 1L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testWithQuerySix() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("WITH T1 AS (SELECT * FROM window_test_table_two) SELECT COUNT(*) as field_count, SESSION_START(\"INTERVAL 3 SECOND\") as window_start, SESSION_END(\"INTERVAL 3 SECOND\") as window_end FROM T1 GROUP BY SESSION(ts, \"INTERVAL 3 SECOND\");"));
        Schema build = Schema.builder().addInt64Field("count_star").addDateTimeField("field1").addDateTimeField("field2").build();
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{2L, new DateTime(2018, 7, 1, 21, 26, 12, ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 12, ISOChronology.getInstanceUTC())}).build(), Row.withSchema(build).addValues(new Object[]{2L, new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC())}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testWithQuerySeven() {
        PAssert.that(BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("WITH t1 AS (select 1 AS k), t2 AS (select 1 AS k), t3 AS (select 1 AS k) SELECT COUNT(*) FROM t1 JOIN t3 USING (k)"))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("count").build()).addValues(new Object[]{1L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testWithQueryEight() {
        PAssert.that(BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("WITH T AS (SELECT k, 'hello' AS s FROM UNNEST([1, 2, 3]) k) SELECT COUNT(*) FROM T t1 JOIN T t2 USING (k)"))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("count").build()).addValues(new Object[]{3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testUNNESTLiteral() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT * FROM UNNEST(ARRAY<STRING>['foo', 'bar']);"));
        Schema build = Schema.builder().addStringField("str_field").build();
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{"foo"}).build(), Row.withSchema(build).addValues(new Object[]{"bar"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testUnnestLiteralWithNullElements() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT * FROM UNNEST(ARRAY<STRING>['foo', NULL, 'bar']);"));
        Schema build = Schema.builder().addNullableField("str_field", Schema.FieldType.STRING).build();
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{"foo"}).build(), Row.withSchema(build).addValues(new Object[]{(String) null}).build(), Row.withSchema(build).addValues(new Object[]{"bar"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testUNNESTParameters() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT * FROM UNNEST(@p0);", ImmutableMap.of("p0", Value.createArrayValue(TypeFactory.createArrayType(TypeFactory.createSimpleType(ZetaSQLType.TypeKind.TYPE_STRING)), ImmutableList.of(Value.createStringValue("foo"), Value.createStringValue("bar"))))));
        Schema build = Schema.builder().addStringField("str_field").build();
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{"foo"}).build(), Row.withSchema(build).addValues(new Object[]{"bar"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    @Ignore("[https://github.com/apache/beam/issues/20139] ArrayScanToUncollectConverter Unnest does not support sub-queries")
    public void testUNNESTExpression() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT * FROM UNNEST(ARRAY(SELECT Value FROM KeyValue));"));
        Schema build = Schema.builder().addStringField("str_field").build();
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{"KeyValue234"}).build(), Row.withSchema(build).addValues(new Object[]{"KeyValue235"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testNamedUNNESTLiteral() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT *, T1 FROM UNNEST(ARRAY<STRING>['foo', 'bar']) AS T1"));
        Schema build = Schema.builder().addStringField("str_field").addStringField("str2_field").build();
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{"foo", "foo"}).build(), Row.withSchema(build).addValues(new Object[]{"bar", "bar"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testNamedUNNESTLiteralOffset() {
        BeamRelNode convertToBeamRel = new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT x, p FROM UNNEST([3, 4]) AS x WITH OFFSET p");
        this.thrown.expect(UnsupportedOperationException.class);
        BeamSqlRelUtils.toPCollection(this.pipeline, convertToBeamRel);
    }

    @Test
    public void testUnnestArrayColumn() {
        PCollection<Row> execute = execute("SELECT p FROM table_with_array_for_unnest, UNNEST(table_with_array_for_unnest.int_array_col) as p");
        Schema build = Schema.builder().addInt64Field("int_field").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValue(14L).build(), Row.withSchema(build).addValue(18L).build(), Row.withSchema(build).addValue(22L).build(), Row.withSchema(build).addValue(24L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testUnnestArrayOfStructColumn() {
        PCollection<Row> execute = execute("SELECT int_col, data FROM table_with_array_of_struct, UNNEST(array_col) AS s");
        Schema build = Schema.builder().addInt64Field("int_col").addStringField("data").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{10L, "1"}).build(), Row.withSchema(build).addValues(new Object[]{20L, "2"}).build(), Row.withSchema(build).addValues(new Object[]{20L, "3"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testUnnestArrayOfStructLiteral() {
        PCollection<Row> execute = execute("SELECT a, b FROM UNNEST([STRUCT(1 AS a, '1' AS b), STRUCT(2, '2')])");
        Schema build = Schema.builder().addInt64Field("a").addStringField("b").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L, "1"}).build(), Row.withSchema(build).addValues(new Object[]{2L, "2"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testStructOfStructPassthrough() {
        PAssert.that(execute("SELECT * FROM table_with_struct_of_struct")).containsInAnyOrder(new Row[]{Row.withSchema(TestInput.STRUCT_OF_STRUCT).attachValues(new Object[]{Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(new Object[]{1L, "1"})}), Row.withSchema(TestInput.STRUCT_OF_STRUCT).attachValues(new Object[]{Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(new Object[]{2L, "2"})})});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testStructOfStructSimpleRename() {
        PCollection<Row> execute = execute("SELECT row as not_row FROM table_with_struct_of_struct");
        Schema build = Schema.builder().addRowField("not_row", TestInput.STRUCT_SCHEMA).build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).attachValues(new Object[]{Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(new Object[]{1L, "1"})}), Row.withSchema(build).attachValues(new Object[]{Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(new Object[]{2L, "2"})})});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    @Ignore("[BEAM-9378] This should work, but is currently unimplemented.")
    public void testStructOfStructRemap() {
        PCollection<Row> execute = execute("SELECT STRUCT(row.row_id AS int_value_remapped) AS remapped FROM table_with_struct_of_struct");
        Schema build = Schema.builder().addInt64Field("int_value_remapped").build();
        Schema build2 = Schema.builder().addRowField("remapped", build).build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build2).attachValues(new Object[]{Row.withSchema(build).attachValues(new Object[]{1L})}), Row.withSchema(build2).attachValues(new Object[]{Row.withSchema(build).attachValues(new Object[]{2L})})});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testUnnestStructOfStructOfArray() {
        PCollection<Row> execute = execute("SELECT int_col, s FROM table_with_struct_of_struct_of_array, UNNEST(struct_col.struct.arr) as s");
        Schema build = Schema.builder().addInt64Field("int_col").addStringField("p").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{10L, "1"}).build(), Row.withSchema(build).addValues(new Object[]{20L, "2"}).build(), Row.withSchema(build).addValues(new Object[]{20L, "3"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testUnnestArrayOfStructOfStructColumn() {
        PCollection<Row> execute = execute("SELECT s.row FROM table_with_array_of_struct_of_struct, UNNEST(array_col) as s");
        Schema build = Schema.builder().addRowField("row", TestInput.STRUCT_SCHEMA).build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{Row.withSchema(TestInput.STRUCT_SCHEMA).addValues(new Object[]{1L, "1"}).build()}).build(), Row.withSchema(build).addValues(new Object[]{Row.withSchema(TestInput.STRUCT_SCHEMA).addValues(new Object[]{2L, "2"}).build()}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testUnnestArrayOfStructOfStructLiteral() {
        PCollection<Row> execute = execute("SELECT s.row FROM UNNEST([STRUCT(STRUCT(1, '1') as row), STRUCT(STRUCT(2, '2'))]) as s");
        Schema build = Schema.builder().addRowField("row", TestInput.STRUCT_SCHEMA).build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{Row.withSchema(TestInput.STRUCT_SCHEMA).addValues(new Object[]{1L, "1"}).build()}).build(), Row.withSchema(build).addValues(new Object[]{Row.withSchema(TestInput.STRUCT_SCHEMA).addValues(new Object[]{2L, "2"}).build()}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testUnnestStructOfArrayOfStructColumn() {
        PCollection<Row> execute = execute("SELECT int_col, data FROM table_with_struct_of_array_of_struct, UNNEST(struct_col.arr) as s");
        Schema build = Schema.builder().addInt64Field("int_col").addStringField("p").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{10L, "1"}).build(), Row.withSchema(build).addValues(new Object[]{20L, "2"}).build(), Row.withSchema(build).addValues(new Object[]{20L, "3"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testUnnestArrayOfStructOfArrayColumn() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT s FROM table_with_array_of_struct_of_array, UNNEST(array_col), UNNEST(arr) AS s"));
        Schema build = Schema.builder().addStringField("s").build();
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{"a"}).build(), Row.withSchema(build).addValues(new Object[]{"b"}).build(), Row.withSchema(build).addValues(new Object[]{"c"}).build(), Row.withSchema(build).addValues(new Object[]{"d"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testUnnestArrayOfStructOfArrayLiteral() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT b FROM UNNEST([STRUCT([1, 2, 3] AS a)]), UNNEST(a) AS b"));
        Schema build = Schema.builder().addInt64Field("int").build();
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L}).build(), Row.withSchema(build).addValues(new Object[]{2L}).build(), Row.withSchema(build).addValues(new Object[]{3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testStringAggregation() {
        PAssert.that(execute("SELECT STRING_AGG(fruit) AS string_agg FROM UNNEST([\"apple\", \"pear\", \"banana\", \"pear\"]) AS fruit")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("string_field").build()).addValue("apple,pear,banana,pear").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testStringAggregationBytes() {
        PAssert.that(execute("SELECT STRING_AGG(CAST(fruit as bytes)) AS string_agg FROM UNNEST([\"apple\", \"pear\", \"banana\", \"pear\"]) AS fruit")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addByteArrayField("bytearray_field").build()).addValue("apple,pear,banana,pear".getBytes(StandardCharsets.UTF_8)).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testStringAggregationDelimiter() {
        PAssert.that(execute("SELECT STRING_AGG(fruit, \"&\") AS string_agg FROM UNNEST([\"apple\", \"pear\", \"banana\", \"pear\"]) AS fruit")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("string_field").build()).addValue("apple&pear&banana&pear").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testStringAggregationBytesDelimiter() {
        PAssert.that(execute("SELECT STRING_AGG(CAST(fruit as bytes), b\"&\") AS string_agg FROM UNNEST([\"apple\", \"pear\", \"banana\", \"pear\"]) AS fruit")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addByteArrayField("bytearray_field").build()).addValue("apple&pear&banana&pear".getBytes(StandardCharsets.UTF_8)).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testStringAggregationParamsDelimiter() {
        ImmutableMap build = ImmutableMap.builder().put("separator", Value.createStringValue(",")).build();
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(ZetaSqlException.class);
        zetaSQLQueryPlanner.convertToBeamRel("SELECT string_agg(\"s\", @separator) FROM (SELECT 1)", build);
    }

    @Test
    @Ignore("Seeing exception in Beam, need further investigation on the cause of this failed query.")
    public void testNamedUNNESTJoin() {
        PAssert.that(execute("SELECT * FROM table_with_array_for_unnest AS t1 LEFT JOIN UNNEST(t1.int_array_col) AS t2 on  t1.int_col = t2")).empty();
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testUnnestJoinStruct() {
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(UnsupportedOperationException.class);
        zetaSQLQueryPlanner.convertToBeamRel("SELECT b, x FROM UNNEST([STRUCT(true AS b, [3, 5] AS arr), STRUCT(false AS b, [7, 9] AS arr)]) t LEFT JOIN UNNEST(t.arr) x ON b");
    }

    @Test
    public void testUnnestJoinLiteral() {
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(UnsupportedOperationException.class);
        zetaSQLQueryPlanner.convertToBeamRel("SELECT a, b FROM UNNEST([1, 1, 2, 3, 5, 8, 13, NULL]) a JOIN UNNEST([1, 2, 3, 5, 7, 11, 13, NULL]) b ON a = b");
    }

    @Test
    public void testUnnestJoinSubquery() {
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(UnsupportedOperationException.class);
        zetaSQLQueryPlanner.convertToBeamRel("SELECT a, b FROM UNNEST([1, 2, 3]) a JOIN UNNEST(ARRAY(SELECT b FROM UNNEST([3, 2, 1]) b)) b ON a = b");
    }

    @Test
    public void testCaseNoValue() {
        PAssert.that(execute("SELECT CASE WHEN 1 > 2 THEN 'not possible' ELSE 'seems right' END")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("str_field").build()).addValue("seems right").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCaseWithValue() {
        PAssert.that(execute("SELECT CASE 1 WHEN 2 THEN 'not possible' ELSE 'seems right' END")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("str_field").build()).addValue("seems right").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCaseWithValueMultipleCases() {
        PAssert.that(execute("SELECT CASE 2 WHEN 1 THEN 'not possible' WHEN 2 THEN 'seems right' ELSE 'also not possible' END")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("str_field").build()).addValue("seems right").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCaseWithValueNoElse() {
        PAssert.that(execute("SELECT CASE 2 WHEN 1 THEN 'not possible' WHEN 2 THEN 'seems right' END")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("str_field").build()).addValue("seems right").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCaseNoValueNoElseNoMatch() {
        PAssert.that(execute("SELECT CASE WHEN 'abc' = '123' THEN 'not possible' END")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("str_field", Schema.FieldType.STRING).build()).addValue((Object) null).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCaseWithValueNoElseNoMatch() {
        PAssert.that(execute("SELECT CASE 2 WHEN 1 THEN 'not possible' END")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("str_field", Schema.FieldType.STRING).build()).addValue((Object) null).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCastToDateWithCase() {
        PAssert.that(execute("SELECT f_int, \nCASE WHEN CHAR_LENGTH(TRIM(f_string)) = 8 \n    THEN CAST (CONCAT(\n       SUBSTR(TRIM(f_string), 1, 4) \n        , '-' \n        , SUBSTR(TRIM(f_string), 5, 2) \n        , '-' \n        , SUBSTR(TRIM(f_string), 7, 2)) AS DATE)\n    ELSE NULL\nEND \nFROM table_for_case_when")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("f_long").addNullableField("f_date", Schema.FieldType.logicalType(SqlTypes.DATE)).build()).addValues(new Object[]{1L, LocalDate.parse("2018-10-18")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testIntersectAll() {
        PCollection<Row> execute = execute("SELECT Key FROM aggregate_test_table INTERSECT ALL SELECT Key FROM aggregate_test_table_two");
        Schema build = Schema.builder().addInt64Field("field").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L}).build(), Row.withSchema(build).addValues(new Object[]{2L}).build(), Row.withSchema(build).addValues(new Object[]{2L}).build(), Row.withSchema(build).addValues(new Object[]{2L}).build(), Row.withSchema(build).addValues(new Object[]{3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testIntersectDistinct() {
        PCollection<Row> execute = execute("SELECT Key FROM aggregate_test_table INTERSECT DISTINCT SELECT Key FROM aggregate_test_table_two");
        Schema build = Schema.builder().addInt64Field("field").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L}).build(), Row.withSchema(build).addValues(new Object[]{2L}).build(), Row.withSchema(build).addValues(new Object[]{3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testExceptAll() {
        PCollection<Row> execute = execute("SELECT Key FROM aggregate_test_table EXCEPT ALL SELECT Key FROM aggregate_test_table_two");
        Schema build = Schema.builder().addInt64Field("field").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L}).build(), Row.withSchema(build).addValues(new Object[]{3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSelectNullIntersectDistinct() {
        PCollection<Row> execute = execute("SELECT NULL INTERSECT DISTINCT SELECT 2");
        System.err.println("SCHEMA " + execute.getSchema());
        PAssert.that(execute).empty();
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSelectNullIntersectAll() {
        PCollection<Row> execute = execute("SELECT NULL INTERSECT ALL SELECT 2");
        System.err.println("SCHEMA " + execute.getSchema());
        PAssert.that(execute).empty();
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSelectNullExceptDistinct() {
        PCollection<Row> execute = execute("SELECT NULL EXCEPT DISTINCT SELECT 2");
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.nullRow(execute.getSchema())});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSelectNullExceptAll() {
        PCollection<Row> execute = execute("SELECT NULL EXCEPT ALL SELECT 2");
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.nullRow(execute.getSchema())});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSelectFromEmptyTable() {
        PAssert.that(execute("SELECT * FROM table_empty;")).empty();
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testStartsWithString() {
        PAssert.that(execute("SELECT STARTS_WITH('string1', 'stri')")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{true}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testStartsWithString2() {
        PAssert.that(execute("SELECT STARTS_WITH(@p0, @p1)", (Map<String, Value>) ImmutableMap.builder().put("p0", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING)).put("p1", Value.createStringValue("")).build())).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{(Boolean) null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testStartsWithString3() {
        PAssert.that(execute("SELECT STARTS_WITH(@p0, @p1)", (Map<String, Value>) ImmutableMap.builder().put("p0", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING)).put("p1", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING)).build())).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{(Boolean) null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testEndsWithString() {
        PAssert.that(execute("SELECT STARTS_WITH('string1', 'ng0')")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{false}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testEndsWithString2() {
        PAssert.that(execute("SELECT STARTS_WITH(@p0, @p1)", (Map<String, Value>) ImmutableMap.builder().put("p0", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING)).put("p1", Value.createStringValue("")).build())).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{(Boolean) null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testEndsWithString3() {
        PAssert.that(execute("SELECT STARTS_WITH(@p0, @p1)", (Map<String, Value>) ImmutableMap.builder().put("p0", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING)).put("p1", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING)).build())).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build()).addValues(new Object[]{(Boolean) null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testConcatWithOneParameters() {
        PAssert.that(execute("SELECT concat('abc')")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"abc"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testConcatWithTwoParameters() {
        PAssert.that(execute("SELECT concat('abc', 'def')")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"abcdef"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testConcatWithThreeParameters() {
        PAssert.that(execute("SELECT concat('abc', 'def', 'xyz')")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"abcdefxyz"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testConcatWithFourParameters() {
        PAssert.that(execute("SELECT concat('abc', 'def', '  ', 'xyz')")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"abcdef  xyz"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testConcatWithFiveParameters() {
        PAssert.that(execute("SELECT concat('abc', 'def', '  ', 'xyz', 'kkk')")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"abcdef  xyzkkk"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testConcatWithSixParameters() {
        PAssert.that(execute("SELECT concat('abc', 'def', '  ', 'xyz', 'kkk', 'ttt')")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"abcdef  xyzkkkttt"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testConcatWithNull1() {
        PAssert.that(execute("SELECT CONCAT(@p0, @p1) AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue(""), "p1", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build()).addValues(new Object[]{(String) null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testConcatWithNull2() {
        PAssert.that(execute("SELECT CONCAT(@p0, @p1) AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING), "p1", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build()).addValues(new Object[]{(String) null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testNamedParameterQuery() {
        PAssert.that(execute("SELECT @ColA AS ColA", (Map<String, Value>) ImmutableMap.of("ColA", Value.createInt64Value(5L)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{5L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testArrayStructLiteral() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT ARRAY<STRUCT<INT64, INT64>>[(11, 12)];"));
        Schema of = Schema.of(new Schema.Field[]{Schema.Field.of("s", Schema.FieldType.INT64), Schema.Field.of("i", Schema.FieldType.INT64)});
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(Schema.of(new Schema.Field[]{Schema.Field.of("field1", Schema.FieldType.array(Schema.FieldType.row(of)))})).addValue(ImmutableList.of(Row.withSchema(of).addValues(new Object[]{11L, 12L}).build())).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testParameterStruct() {
        PCollection<Row> execute = execute("SELECT @p as ColA", (Map<String, Value>) ImmutableMap.of("p", Value.createStructValue(TypeFactory.createStructType(ImmutableList.of(new StructType.StructField("s", TypeFactory.createSimpleType(ZetaSQLType.TypeKind.TYPE_STRING)), new StructType.StructField("i", TypeFactory.createSimpleType(ZetaSQLType.TypeKind.TYPE_INT64)))), ImmutableList.of(Value.createStringValue("foo"), Value.createInt64Value(1L)))));
        Schema of = Schema.of(new Schema.Field[]{Schema.Field.of("s", Schema.FieldType.STRING), Schema.Field.of("i", Schema.FieldType.INT64)});
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(Schema.of(new Schema.Field[]{Schema.Field.of("field1", Schema.FieldType.row(of))})).addValue(Row.withSchema(of).addValues(new Object[]{"foo", 1L}).build()).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testParameterStructNested() {
        StructType createStructType = TypeFactory.createStructType(ImmutableList.of(new StructType.StructField("s", TypeFactory.createSimpleType(ZetaSQLType.TypeKind.TYPE_STRING))));
        PAssert.that(execute("SELECT @outer_struct.inner_struct.s as ColA", (Map<String, Value>) ImmutableMap.of("outer_struct", Value.createStructValue(TypeFactory.createStructType(ImmutableList.of(new StructType.StructField("inner_struct", createStructType))), ImmutableList.of(Value.createStructValue(createStructType, ImmutableList.of(Value.createStringValue("foo")))))))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValue("foo").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testConcatNamedParameterQuery() {
        PAssert.that(execute("SELECT CONCAT(@p0, @p1) AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue(""), "p1", Value.createStringValue("A")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"A"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testConcatPositionalParameterQuery() {
        PAssert.that(execute("SELECT CONCAT(?, ?, ?) AS ColA", (List<Value>) ImmutableList.of(Value.createStringValue("a"), Value.createStringValue("b"), Value.createStringValue("c")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"abc"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testReplace1() {
        PAssert.that(execute("SELECT REPLACE(@p0, @p1, @p2) AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue(""), "p1", Value.createStringValue(""), "p2", Value.createStringValue("a")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{""}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testReplace2() {
        PAssert.that(execute("SELECT REPLACE(@p0, @p1, @p2) AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue("abc"), "p1", Value.createStringValue(""), "p2", Value.createStringValue("xyz")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"abc"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testReplace3() {
        PAssert.that(execute("SELECT REPLACE(@p0, @p1, @p2) AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue(""), "p1", Value.createStringValue(""), "p2", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build()).addValues(new Object[]{(String) null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testReplace4() {
        PAssert.that(execute("SELECT REPLACE(@p0, @p1, @p2) AS ColA", (Map<String, Value>) ImmutableMap.of("p0", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING), "p1", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING), "p2", Value.createStringValue("")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build()).addValues(new Object[]{(String) null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testTrim1() {
        PAssert.that(execute("SELECT trim(@p0)", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue("   a b c   ")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"a b c"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testTrim2() {
        PAssert.that(execute("SELECT trim(@p0, @p1)", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue("abxyzab"), "p1", Value.createStringValue("ab")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"xyz"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testTrim3() {
        PAssert.that(execute("SELECT trim(@p0, @p1)", (Map<String, Value>) ImmutableMap.of("p0", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING), "p1", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build()).addValues(new Object[]{(String) null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testLTrim1() {
        PAssert.that(execute("SELECT ltrim(@p0)", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue("   a b c   ")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"a b c   "}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testLTrim2() {
        PAssert.that(execute("SELECT ltrim(@p0, @p1)", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue("abxyzab"), "p1", Value.createStringValue("ab")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"xyzab"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testLTrim3() {
        PAssert.that(execute("SELECT ltrim(@p0, @p1)", (Map<String, Value>) ImmutableMap.of("p0", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING), "p1", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build()).addValues(new Object[]{(String) null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testRTrim1() {
        PAssert.that(execute("SELECT rtrim(@p0)", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue("   a b c   ")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"   a b c"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testRTrim2() {
        PAssert.that(execute("SELECT rtrim(@p0, @p1)", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue("abxyzab"), "p1", Value.createStringValue("ab")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"abxyz"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testRTrim3() {
        PAssert.that(execute("SELECT rtrim(@p0, @p1)", (Map<String, Value>) ImmutableMap.of("p0", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING), "p1", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build()).addValues(new Object[]{(String) null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    @Ignore("https://github.com/apache/beam/issues/20101")
    public void testCastBytesToString1() {
        PAssert.that(execute("SELECT CAST(@p0 AS STRING)", (Map<String, Value>) ImmutableMap.of("p0", Value.createBytesValue(ByteString.copyFromUtf8("`"))))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"`"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCastBytesToString2() {
        PAssert.that(execute("SELECT CAST(b'b' AS STRING)")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"b"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    @Ignore("https://github.com/apache/beam/issues/20101")
    public void testCastBytesToStringFromTable() {
        PCollection<Row> execute = execute("SELECT CAST(bytes_col AS STRING) FROM table_all_types");
        Schema build = Schema.builder().addStringField("field1").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{"1"}).build(), Row.withSchema(build).addValues(new Object[]{"2"}).build(), Row.withSchema(build).addValues(new Object[]{"3"}).build(), Row.withSchema(build).addValues(new Object[]{"4"}).build(), Row.withSchema(build).addValues(new Object[]{"5"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCastStringToTimestamp() {
        PAssert.that(execute("SELECT CAST('2019-01-15 13:21:03' AS TIMESTAMP)")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addDateTimeField("field_1").build()).addValues(new Object[]{DateTimeUtils.parseTimestampWithUTCTimeZone("2019-01-15 13:21:03")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCastStringToTimestampWithDefaultTimezoneSet() {
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        zetaSQLQueryPlanner.setDefaultTimezone("Pacific/Chatham");
        this.pipeline.getOptions().as(BeamSqlPipelineOptions.class).setZetaSqlDefaultTimezone("Pacific/Chatham");
        PAssert.that(BeamSqlRelUtils.toPCollection(this.pipeline, zetaSQLQueryPlanner.convertToBeamRel("SELECT CAST('2014-12-01 12:34:56+07:30' AS TIMESTAMP)"))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addDateTimeField("field_1").build()).addValues(new Object[]{DateTimeUtils.parseTimestampWithUTCTimeZone("2014-12-01 05:04:56")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    @Ignore("https://github.com/apache/beam/issues/20351")
    public void testCastBetweenTimeAndString() {
        PAssert.that(execute("SELECT CAST(s1 as TIME) as t2, CAST(t1 as STRING) as s2 FROM (SELECT '12:34:56.123456' as s1, TIME '12:34:56.123456' as t1)")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addLogicalTypeField("t2", SqlTypes.TIME).addStringField("s2").build()).addValues(new Object[]{LocalTime.of(12, 34, 56, 123456000), "12:34:56.123456"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCastStringToString() {
        PAssert.that(execute("SELECT CAST(@p0 AS STRING)", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue("")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{""}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCastStringToInt64() {
        PAssert.that(execute("SELECT CAST(@p0 AS INT64)", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue("123")))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{123L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSelectConstant() {
        PAssert.that(execute("SELECT 'hi'")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"hi"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    @Ignore("[https://github.com/apache/beam/issues/19963] ZetaSQL does not support Map type")
    public void testSelectFromTableWithMap() {
        PCollection<Row> execute = execute("SELECT row_field FROM table_with_map");
        Schema build = Schema.builder().addInt64Field("row_id").addStringField("data").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addRowField("row_field", build).build()).addValues(new Object[]{Row.withSchema(build).addValues(new Object[]{1L, "data1"}).build()}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSubQuery() {
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage("Does not support sub-queries");
        zetaSQLQueryPlanner.convertToBeamRel("select sum(Key) from KeyValue\ngroup by (select Key)");
    }

    @Test
    public void testSubstr() {
        PAssert.that(execute("SELECT substr(@p0, @p1, @p2)", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue("abc"), "p1", Value.createInt64Value(-2L), "p2", Value.createInt64Value(1L)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"b"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSubstring() {
        PAssert.that(execute("SELECT substring(@p0, @p1, @p2)", (Map<String, Value>) ImmutableMap.of("p0", Value.createStringValue("abc"), "p1", Value.createInt64Value(-2L), "p2", Value.createInt64Value(1L)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"b"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSubstrWithLargeValueExpectException() {
        BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT substr(@p0, @p1, @p2)", ImmutableMap.of("p0", Value.createStringValue("abc"), "p1", Value.createInt64Value(2147483648L), "p2", Value.createInt64Value(-2147483649L))));
        this.thrown.expect(RuntimeException.class);
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSelectAll() {
        PCollection<Row> execute = execute("SELECT ALL Key, Value FROM KeyValue;");
        Schema build = Schema.builder().addInt64Field("field1").addStringField("field2").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{14L, "KeyValue234"}).build(), Row.withSchema(build).addValues(new Object[]{15L, "KeyValue235"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSelectDistinct() {
        PCollection<Row> execute = execute("SELECT DISTINCT Key FROM aggregate_test_table;");
        Schema build = Schema.builder().addInt64Field("field1").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L}).build(), Row.withSchema(build).addValues(new Object[]{2L}).build(), Row.withSchema(build).addValues(new Object[]{3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSelectDistinct2() {
        PCollection<Row> execute = execute("SELECT DISTINCT val.BYTES\nfrom (select b\"BYTES\" BYTES union all\n      select b\"bytes\" union all\n      select b\"ByTeS\") val");
        Schema build = Schema.builder().addByteArrayField("field1").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{"BYTES".getBytes(StandardCharsets.UTF_8)}).build(), Row.withSchema(build).addValues(new Object[]{"ByTeS".getBytes(StandardCharsets.UTF_8)}).build(), Row.withSchema(build).addValues(new Object[]{"bytes".getBytes(StandardCharsets.UTF_8)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSelectBytes() {
        PAssert.that(execute("SELECT b\"ByTes\"")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addByteArrayField("field1").build()).addValues(new Object[]{"ByTes".getBytes(StandardCharsets.UTF_8)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSelectExcept() {
        PCollection<Row> execute = execute("SELECT * EXCEPT (Key, ts) FROM KeyValue;");
        Schema build = Schema.builder().addStringField("field2").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{"KeyValue234"}).build(), Row.withSchema(build).addValues(new Object[]{"KeyValue235"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSelectReplace() {
        PAssert.that(execute("WITH orders AS\n  (SELECT 5 as order_id,\n  \"sprocket\" as item_name,\n  200 as quantity)\nSELECT * REPLACE (\"widget\" AS item_name)\nFROM orders")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").addStringField("field2").addInt64Field("field3").build()).addValues(new Object[]{5L, "widget", 200L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testUnionAllBasic() {
        PCollection<Row> execute = execute("SELECT row_id FROM table_all_types UNION ALL SELECT row_id FROM table_all_types_2");
        Schema build = Schema.builder().addInt64Field("field1").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValue(1L).build(), Row.withSchema(build).addValue(2L).build(), Row.withSchema(build).addValue(3L).build(), Row.withSchema(build).addValue(4L).build(), Row.withSchema(build).addValue(5L).build(), Row.withSchema(build).addValue(6L).build(), Row.withSchema(build).addValue(7L).build(), Row.withSchema(build).addValue(8L).build(), Row.withSchema(build).addValue(9L).build(), Row.withSchema(build).addValue(10L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testAVGWithLongInput() {
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("AVG(INT64) is not supported. You might want to use AVG(CAST(expression AS FLOAT64).");
        zetaSQLQueryPlanner.convertToBeamRel("SELECT AVG(f_int_1) FROM aggregate_test_table;");
    }

    @Test
    public void testReverseString() {
        PAssert.that(execute("SELECT REVERSE('abc');")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field2").build()).addValues(new Object[]{"cba"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCharLength() {
        PAssert.that(execute("SELECT CHAR_LENGTH('abc');")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field").build()).addValues(new Object[]{3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCharLengthNull() {
        PAssert.that(execute("SELECT CHAR_LENGTH(@p0);", (Map<String, Value>) ImmutableMap.of("p0", Value.createSimpleNullValue(ZetaSQLType.TypeKind.TYPE_STRING)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field", Schema.FieldType.INT64).build()).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testTumbleAsTVF() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("select Key, Value, ts, window_start, window_end from TUMBLE((select * from KeyValue), descriptor(ts), 'INTERVAL 1 SECOND')", ImmutableMap.of()));
        Schema build = Schema.builder().addInt64Field("Key").addStringField("Value").addDateTimeField("ts").addDateTimeField("window_start").addDateTimeField("window_end").build();
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{14L, "KeyValue234", DateTimeUtils.parseTimestampWithUTCTimeZone("2018-07-01 21:26:06"), DateTimeUtils.parseTimestampWithUTCTimeZone("2018-07-01 21:26:06"), DateTimeUtils.parseTimestampWithUTCTimeZone("2018-07-01 21:26:07")}).build(), Row.withSchema(build).addValues(new Object[]{15L, "KeyValue235", DateTimeUtils.parseTimestampWithUTCTimeZone("2018-07-01 21:26:07"), DateTimeUtils.parseTimestampWithUTCTimeZone("2018-07-01 21:26:07"), DateTimeUtils.parseTimestampWithUTCTimeZone("2018-07-01T21:26:08")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testIsNullTrueFalse() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("WITH Src AS (\n  SELECT NULL as data UNION ALL\n  SELECT TRUE UNION ALL\n  SELECT FALSE\n)\nSELECT\n  data IS NULL as isnull,\n  data IS NOT NULL as isnotnull,\n  data IS TRUE as istrue,\n  data IS NOT TRUE as isnottrue,\n  data IS FALSE as isfalse,\n  data IS NOT FALSE as isnotfalse\nFROM Src\n", ImmutableMap.of()));
        Schema build = Schema.builder().addField("isnull", Schema.FieldType.BOOLEAN).addField("isnotnull", Schema.FieldType.BOOLEAN).addField("istrue", Schema.FieldType.BOOLEAN).addField("isnottrue", Schema.FieldType.BOOLEAN).addField("isfalse", Schema.FieldType.BOOLEAN).addField("isnotfalse", Schema.FieldType.BOOLEAN).build();
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{true, false, false, true, false, true}).build(), Row.withSchema(build).addValues(new Object[]{false, true, true, false, false, true}).build(), Row.withSchema(build).addValues(new Object[]{false, true, false, true, true, false}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLBitOr() {
        PCollection<Row> execute = execute("SELECT BIT_OR(row_id) FROM table_all_types GROUP BY bool_col");
        Schema build = Schema.builder().addInt64Field("field1").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{3L}).build(), Row.withSchema(build).addValue(7L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLBitOrNull() {
        PAssert.that(execute("SELECT bit_or(CAST(x as int64)) FROM (SELECT NULL x UNION ALL SELECT 5 UNION ALL SELECT 6);")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValue(7L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLBitAnd() {
        PCollection<Row> execute = execute("SELECT BIT_AND(row_id) FROM table_all_types GROUP BY bool_col");
        Schema build = Schema.builder().addInt64Field("field1").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValue(1L).build(), Row.withSchema(build).addValue(0L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLBitAndInt64() {
        PAssert.that(execute("SELECT bit_and(CAST(x as int64)) FROM (SELECT 1 x FROM (SELECT 1) WHERE false)")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.INT64).build()).addValue((Long) null).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLBitAndNulls() {
        PAssert.that(execute("SELECT bit_and(CAST(x as int64)) FROM (SELECT NULL x UNION ALL SELECT 5 UNION ALL SELECT 6)")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValue(4L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCountEmpty() {
        PAssert.that(execute("SELECT COUNT(x) FROM UNNEST([]) AS x")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValue(0L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testBitwiseOrEmpty() {
        PAssert.that(execute("SELECT BIT_OR(x) FROM UNNEST([]) AS x")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.INT64).build()).addValue((Long) null).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testArrayAggNulls() {
        PAssert.that(execute("SELECT ARRAY_AGG(x) FROM UNNEST([1, NULL, 3]) AS x")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addField(Schema.Field.of("field1", Schema.FieldType.array(Schema.FieldType.of(Schema.TypeName.INT64).withNullable(true)))).build()).addArray(new Object[]{1L, (Long) null, 3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testArrayAggEmpty() {
        PAssert.that(execute("SELECT ARRAY_AGG(x) FROM UNNEST([]) AS x")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.INT64).build()).addValue((Long) null).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testInt64SumOverflow() {
        BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT SUM(col1)\nFROM (SELECT CAST(9223372036854775807 as int64) as col1 UNION ALL\n      SELECT CAST(1 as int64))\n"));
        this.thrown.expect(RuntimeException.class);
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testInt64SumUnderflow() {
        BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT SUM(col1)\nFROM (SELECT CAST(-9223372036854775808 as int64) as col1 UNION ALL\n      SELECT CAST(-1 as int64))\n"));
        this.thrown.expect(RuntimeException.class);
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLSumNulls() {
        PAssert.that(execute("SELECT SUM(x) AS sum FROM UNNEST([null, null, null]) AS x")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.INT64).build()).addValue((Long) null).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSimpleTableName() {
        PCollection<Row> execute = execute("SELECT Key FROM KeyValue");
        Schema build = Schema.builder().addInt64Field("field1").build();
        PAssert.that(execute).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{14L}).build(), Row.withSchema(build).addValues(new Object[]{15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLBitXor() {
        PAssert.that(execute("SELECT BIT_XOR(x) AS bit_xor FROM UNNEST([5678, 1234]) AS x")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValue(4860L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLBitXorEmpty() {
        PAssert.that(execute("SELECT bit_xor(CAST(x as int64)) FROM (SELECT 1 x FROM (SELECT 1) WHERE false);")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.INT64).build()).addValue((Long) null).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLBitXorNull() {
        PAssert.that(execute("SELECT bit_xor(x) FROM (SELECT CAST(NULL AS int64) x);")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addNullableField("field1", Schema.FieldType.INT64).build()).addValue((Long) null).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testCountIfZetaSQLDialect() {
        PAssert.that(execute("WITH is_positive AS ( SELECT x > 0 flag FROM UNNEST([5, -2, 3, 6, -10, -7, 4, 0]) AS x) SELECT COUNTIF(flag) FROM is_positive")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValue(4L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testArrayAggZetasql() {
        PAssert.that(execute("SELECT ARRAY_AGG(x) AS array_agg FROM UNNEST([1, 2, 3, 4, 5]) AS x")).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addArrayField("array_field", Schema.FieldType.INT64).build()).addArray(new Object[]{1L, 2L, 3L, 4L, 5L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 964779441:
                if (implMethodName.equals("lambda$testZetaSQLAnyValueInGroupBy$b546ee3c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    Map map = (Map) serializedLambda.getCapturedArg(0);
                    return iterable -> {
                        Iterator it = iterable.iterator();
                        while (it.hasNext()) {
                            Row row = (Row) it.next();
                            List list = (List) map.remove(row.getInt64("key"));
                            Assert.assertTrue(list != null);
                            Assert.assertTrue(list.contains(row.getString("any_value")));
                        }
                        Assert.assertTrue(map.isEmpty());
                        return null;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
