/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.zetasql;

import com.google.protobuf.ByteString;
import com.google.zetasql.ArrayType;
import com.google.zetasql.SqlException;
import com.google.zetasql.StructType;
import com.google.zetasql.Type;
import com.google.zetasql.TypeFactory;
import com.google.zetasql.Value;
import com.google.zetasql.ZetaSQLType;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.impl.ParseException;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils;
import org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner;
import org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLTestBase;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Chronology;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class ZetaSQLDialectSpecTest
extends ZetaSQLTestBase {
    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void setUp() {
        this.initializeBeamTableProvider();
        this.initializeCalciteEnvironment();
    }

    @Test
    public void testSimpleSelect() {
        String sql = "SELECT CAST (1243 as INT64), CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), CAST ('string' as STRING);";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addDateTimeField("field2").addStringField("field3").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1243L, new DateTime(2018, 9, 15, 12, 59, 59, (Chronology)ISOChronology.getInstanceUTC()), "string"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testWithQueryPlannerClass() {
        String sql = "SELECT CAST (1243 as INT64), CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), CAST ('string' as STRING);";
        PCollection stream = (PCollection)this.pipeline.apply((PTransform)SqlTransform.query((String)sql).withQueryPlannerClass(ZetaSQLQueryPlanner.class));
        Schema schema = Schema.builder().addInt64Field("field1").addDateTimeField("field2").addStringField("field3").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1243L, new DateTime(2018, 9, 15, 12, 59, 59, (Chronology)ISOChronology.getInstanceUTC()), "string"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testPlannerNamePipelineOption() {
        ((BeamSqlPipelineOptions)this.pipeline.getOptions().as(BeamSqlPipelineOptions.class)).setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner");
        String sql = "SELECT CAST (1243 as INT64), CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), CAST ('string' as STRING);";
        PCollection stream = (PCollection)this.pipeline.apply((PTransform)SqlTransform.query((String)sql));
        Schema schema = Schema.builder().addInt64Field("field1").addDateTimeField("field2").addStringField("field3").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1243L, new DateTime(2018, 9, 15, 12, 59, 59, (Chronology)ISOChronology.getInstanceUTC()), "string"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testByteLiterals() {
        String sql = "SELECT b'abc'";
        byte[] byteString = new byte[]{97, 98, 99};
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("ColA", Schema.FieldType.BYTES).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{byteString}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testByteString() {
        String sql = "SELECT @p0 IS NULL AS ColA";
        ByteString byteString = ByteString.copyFrom((byte[])new byte[]{98});
        ImmutableMap params = ImmutableMap.builder().put((Object)"p0", (Object)Value.createBytesValue((ByteString)byteString)).build();
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("ColA", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{false}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testFloat() {
        String sql = "SELECT 3.0";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("ColA", Schema.FieldType.DOUBLE).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{3.0}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testStringLiterals() {
        String sql = "SELECT '\"America/Los_Angeles\"\\n'";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("ColA", Schema.FieldType.STRING).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"\"America/Los_Angeles\"\n"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testParameterString() {
        String sql = "SELECT ?";
        ImmutableList params = ImmutableList.of((Object)Value.createStringValue((String)"abc\n"));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (List)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("ColA", Schema.FieldType.STRING).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"abc\n"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="[BEAM-9182] NULL parameters do not work in BeamZetaSqlCalcRel")
    public void testEQ1() {
        String sql = "SELECT @p0 = @p1 AS ColA";
        ImmutableMap params = ImmutableMap.builder().put((Object)"p0", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_BOOL)).put((Object)"p1", (Object)Value.createBoolValue((boolean)true)).build();
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="Does not support inf/-inf/nan in double/float literals because double/float literals are converted to BigDecimal in Calcite codegen.")
    public void testEQ2() {
        String sql = "SELECT @p0 = @p1 AS ColA";
        ImmutableMap params = ImmutableMap.builder().put((Object)"p0", (Object)Value.createDoubleValue((double)0.0)).put((Object)"p1", (Object)Value.createDoubleValue((double)Double.POSITIVE_INFINITY)).build();
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addBooleanField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{false}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="[BEAM-9182] NULL parameters do not work in BeamZetaSqlCalcRel")
    public void testEQ3() {
        String sql = "SELECT @p0 = @p1 AS ColA";
        ImmutableMap params = ImmutableMap.builder().put((Object)"p0", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_DOUBLE)).put((Object)"p1", (Object)Value.createDoubleValue((double)3.14)).build();
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testEQ4() {
        String sql = "SELECT @p0 = @p1 AS ColA";
        ImmutableMap params = ImmutableMap.builder().put((Object)"p0", (Object)Value.createBytesValue((ByteString)ByteString.copyFromUtf8((String)"hello"))).put((Object)"p1", (Object)Value.createBytesValue((ByteString)ByteString.copyFromUtf8((String)"hello"))).build();
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{true}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testEQ5() {
        String sql = "SELECT b'hello' = b'hello' AS ColA";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{true}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testEQ6() {
        String sql = "SELECT ? = ? AS ColA";
        ImmutableList params = ImmutableList.of((Object)Value.createInt64Value((long)4L), (Object)Value.createInt64Value((long)5L));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (List)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{false}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testIsNotNull1() {
        String sql = "SELECT @p0 IS NOT NULL AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{false}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testIsNotNull2() {
        String sql = "SELECT @p0 IS NOT NULL AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createNullValue((Type)TypeFactory.createArrayType((Type)TypeFactory.createSimpleType((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_INT64))));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{false}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testIsNotNull3() {
        String sql = "SELECT @p0 IS NOT NULL AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createNullValue((Type)TypeFactory.createStructType(Arrays.asList(new StructType.StructField("a", (Type)TypeFactory.createSimpleType((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING))))));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{false}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testIfBasic() {
        String sql = "SELECT IF(@p0, @p1, @p2) AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createBoolValue((boolean)true), (Object)"p1", (Object)Value.createInt64Value((long)1L), (Object)"p2", (Object)Value.createInt64Value((long)2L));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.INT64).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testIfPositional() {
        String sql = "SELECT IF(?, ?, ?) AS ColA";
        ImmutableList params = ImmutableList.of((Object)Value.createBoolValue((boolean)true), (Object)Value.createInt64Value((long)1L), (Object)Value.createInt64Value((long)2L));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (List)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.INT64).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testCoalesceBasic() {
        String sql = "SELECT COALESCE(@p0, @p1, @p2) AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING), (Object)"p1", (Object)Value.createStringValue((String)"yay"), (Object)"p2", (Object)Value.createStringValue((String)"nay"));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"yay"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testCoalesceSingleArgument() {
        String sql = "SELECT COALESCE(@p0) AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_INT64));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.array((Schema.FieldType)Schema.FieldType.INT64)).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue(null).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testCoalesceNullArray() {
        String sql = "SELECT COALESCE(@p0, @p1) AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createNullValue((Type)TypeFactory.createArrayType((Type)TypeFactory.createSimpleType((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_INT64))), (Object)"p1", (Object)Value.createNullValue((Type)TypeFactory.createArrayType((Type)TypeFactory.createSimpleType((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_INT64))));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.array((Schema.FieldType)Schema.FieldType.INT64)).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue(null).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="[BEAM-9182] NULL parameters do not work in BeamZetaSqlCalcRel")
    public void testNullIfCoercion() {
        String sql = "SELECT NULLIF(@p0, @p1) AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createInt64Value((long)3L), (Object)"p1", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_DOUBLE));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.DOUBLE).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)3.0).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testCoalesceNullStruct() {
        String sql = "SELECT COALESCE(NULL, STRUCT(\"a\" AS s, -33 AS i))";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema innerSchema = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"s", (Schema.FieldType)Schema.FieldType.STRING), Schema.Field.of((String)"i", (Schema.FieldType)Schema.FieldType.INT64)});
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.row((Schema)innerSchema)).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)Row.withSchema((Schema)innerSchema).addValues(new Object[]{"a", -33L}).build()).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testIfTimestamp() {
        String sql = "SELECT IF(@p0, @p1, @p2) AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createBoolValue((boolean)false), (Object)"p1", (Object)Value.createTimestampValueFromUnixMicros((long)0L), (Object)"p2", (Object)Value.createTimestampValueFromUnixMicros((long)(DateTime.parse((String)"2019-01-01T00:00:00Z").getMillis() * 1000L)));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.DATETIME).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{DateTime.parse((String)"2019-01-01T00:00:00Z")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="$make_array is not implemented")
    public void testMakeArray() {
        String sql = "SELECT [s3, s1, s2] FROM (SELECT \"foo\" AS s1, \"bar\" AS s2, \"baz\" AS s3);";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.array((Schema.FieldType)Schema.FieldType.STRING)).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)ImmutableList.of((Object)"baz", (Object)"foo", (Object)"bar")).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testNullIfPositive() {
        String sql = "SELECT NULLIF(@p0, @p1) AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)"null"), (Object)"p1", (Object)Value.createStringValue((String)"null"));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue(null).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testNullIfNegative() {
        String sql = "SELECT NULLIF(@p0, @p1) AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)"foo"), (Object)"p1", (Object)Value.createStringValue((String)"null"));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"foo"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testIfNullPositive() {
        String sql = "SELECT IFNULL(@p0, @p1) AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)"foo"), (Object)"p1", (Object)Value.createStringValue((String)"default"));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"foo"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testIfNullNegative() {
        String sql = "SELECT IFNULL(@p0, @p1) AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING), (Object)"p1", (Object)Value.createStringValue((String)"yay"));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"yay"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testEmptyArrayParameter() {
        String sql = "SELECT @p0 AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createArrayValue((ArrayType)TypeFactory.createArrayType((Type)TypeFactory.createSimpleType((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_INT64)), (Collection)ImmutableList.of()));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addArrayField("field1", Schema.FieldType.INT64).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)ImmutableList.of()).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testEmptyArrayLiteral() {
        String sql = "SELECT ARRAY<STRING>[];";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addArrayField("field1", Schema.FieldType.STRING).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)ImmutableList.of()).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testLike1() {
        String sql = "SELECT @p0 LIKE  @p1 AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)"ab%"), (Object)"p1", (Object)Value.createStringValue((String)"ab\\%"));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{true}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="[BEAM-9182] NULL parameters do not work in BeamZetaSqlCalcRel")
    public void testLikeNullPattern() {
        String sql = "SELECT @p0 LIKE  @p1 AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)"ab%"), (Object)"p1", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testLikeAllowsEscapingNonSpecialCharacter() {
        String sql = "SELECT @p0 LIKE  @p1 AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)"ab"), (Object)"p1", (Object)Value.createStringValue((String)"\\ab"));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{true}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testLikeAllowsEscapingBackslash() {
        String sql = "SELECT @p0 LIKE  @p1 AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)"a\\c"), (Object)"p1", (Object)Value.createStringValue((String)"a\\\\c"));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{true}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testLikeBytes() {
        String sql = "SELECT @p0 LIKE  @p1 AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createBytesValue((ByteString)ByteString.copyFromUtf8((String)"abcd")), (Object)"p1", (Object)Value.createBytesValue((ByteString)ByteString.copyFromUtf8((String)"__%")));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{true}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testMod() {
        String sql = "SELECT MOD(4, 2)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{0L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testSimpleUnionAll() {
        String sql = "SELECT CAST (1243 as INT64), CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), CAST ('string' as STRING)  UNION ALL  SELECT CAST (1243 as INT64), CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), CAST ('string' as STRING);";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addDateTimeField("field2").addStringField("field3").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1243L, new DateTime(2018, 9, 15, 12, 59, 59, (Chronology)ISOChronology.getInstanceUTC()), "string"}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{1243L, new DateTime(2018, 9, 15, 12, 59, 59, (Chronology)ISOChronology.getInstanceUTC()), "string"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testThreeWayUnionAll() {
        String sql = "SELECT a FROM (SELECT 1 a UNION ALL SELECT 2 UNION ALL SELECT 3)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{2L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testSimpleUnionDISTINCT() {
        String sql = "SELECT CAST (1243 as INT64), CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), CAST ('string' as STRING)  UNION DISTINCT  SELECT CAST (1243 as INT64), CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), CAST ('string' as STRING);";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addDateTimeField("field2").addStringField("field3").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1243L, new DateTime(2018, 9, 15, 12, 59, 59, (Chronology)ISOChronology.getInstanceUTC()), "string"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLInnerJoin() {
        String sql = "SELECT t1.Key FROM KeyValue AS t1 INNER JOIN BigTable AS t2 on  t1.Key = t2.RowKey AND t1.ts = t2.ts";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLInnerJoinWithUsing() {
        String sql = "SELECT t1.Key FROM KeyValue AS t1 INNER JOIN BigTable AS t2 USING(ts)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLInnerJoinTwo() {
        String sql = "SELECT t2.RowKey FROM KeyValue AS t1 INNER JOIN BigTable AS t2 on  t2.RowKey = t1.Key AND t2.ts = t1.ts";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLLeftOuterJoin() {
        String sql = "SELECT * FROM KeyValue AS t1 LEFT JOIN BigTable AS t2 on  t1.Key = t2.RowKey";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schemaOne = Schema.builder().addInt64Field("field1").addStringField("field2").addDateTimeField("field3").addNullableField("field4", Schema.FieldType.INT64).addNullableField("field5", Schema.FieldType.STRING).addNullableField("field6", Schema.FieldType.DATETIME).build();
        Schema schemaTwo = Schema.builder().addInt64Field("field1").addStringField("field2").addDateTimeField("field3").addInt64Field("field4").addStringField("field5").addDateTimeField("field6").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schemaOne).addValues(new Object[]{14L, "KeyValue234", new DateTime(2018, 7, 1, 21, 26, 6, (Chronology)ISOChronology.getInstanceUTC()), null, null, null}).build(), Row.withSchema((Schema)schemaTwo).addValues(new Object[]{15L, "KeyValue235", new DateTime(2018, 7, 1, 21, 26, 7, (Chronology)ISOChronology.getInstanceUTC()), 15L, "BigTable235", new DateTime(2018, 7, 1, 21, 26, 7, (Chronology)ISOChronology.getInstanceUTC())}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLRightOuterJoin() {
        String sql = "SELECT * FROM KeyValue AS t1 RIGHT JOIN BigTable AS t2 on  t1.Key = t2.RowKey";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schemaOne = Schema.builder().addNullableField("field1", Schema.FieldType.INT64).addNullableField("field2", Schema.FieldType.STRING).addNullableField("field3", Schema.FieldType.DATETIME).addInt64Field("field4").addStringField("field5").addDateTimeField("field6").build();
        Schema schemaTwo = Schema.builder().addInt64Field("field1").addStringField("field2").addDateTimeField("field3").addInt64Field("field4").addStringField("field5").addDateTimeField("field6").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schemaOne).addValues(new Object[]{null, null, null, 16L, "BigTable236", new DateTime(2018, 7, 1, 21, 26, 8, (Chronology)ISOChronology.getInstanceUTC())}).build(), Row.withSchema((Schema)schemaTwo).addValues(new Object[]{15L, "KeyValue235", new DateTime(2018, 7, 1, 21, 26, 7, (Chronology)ISOChronology.getInstanceUTC()), 15L, "BigTable235", new DateTime(2018, 7, 1, 21, 26, 7, (Chronology)ISOChronology.getInstanceUTC())}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLFullOuterJoin() {
        String sql = "SELECT * FROM KeyValue AS t1 FULL JOIN BigTable AS t2 on  t1.Key = t2.RowKey";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schemaOne = Schema.builder().addNullableField("field1", Schema.FieldType.INT64).addNullableField("field2", Schema.FieldType.STRING).addNullableField("field3", Schema.FieldType.DATETIME).addInt64Field("field4").addStringField("field5").addDateTimeField("field6").build();
        Schema schemaTwo = Schema.builder().addInt64Field("field1").addStringField("field2").addDateTimeField("field3").addInt64Field("field4").addStringField("field5").addDateTimeField("field6").build();
        Schema schemaThree = Schema.builder().addInt64Field("field1").addStringField("field2").addDateTimeField("field3").addNullableField("field4", Schema.FieldType.INT64).addNullableField("field5", Schema.FieldType.STRING).addNullableField("field6", Schema.FieldType.DATETIME).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schemaOne).addValues(new Object[]{null, null, null, 16L, "BigTable236", new DateTime(2018, 7, 1, 21, 26, 8, (Chronology)ISOChronology.getInstanceUTC())}).build(), Row.withSchema((Schema)schemaTwo).addValues(new Object[]{15L, "KeyValue235", new DateTime(2018, 7, 1, 21, 26, 7, (Chronology)ISOChronology.getInstanceUTC()), 15L, "BigTable235", new DateTime(2018, 7, 1, 21, 26, 7, (Chronology)ISOChronology.getInstanceUTC())}).build(), Row.withSchema((Schema)schemaThree).addValues(new Object[]{14L, "KeyValue234", new DateTime(2018, 7, 1, 21, 26, 6, (Chronology)ISOChronology.getInstanceUTC()), null, null, null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="BeamSQL only supports equal join")
    public void testZetaSQLFullOuterJoinTwo() {
        String sql = "SELECT * FROM KeyValue AS t1 FULL JOIN BigTable AS t2 on  t1.Key + t2.RowKey = 30";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLFullOuterJoinFalse() {
        String sql = "SELECT * FROM KeyValue AS t1 FULL JOIN BigTable AS t2 ON false";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        this.thrown.expect(UnsupportedOperationException.class);
        BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
    }

    @Test
    public void testZetaSQLThreeWayInnerJoin() {
        String sql = "SELECT t3.Value, t2.Value, t1.Value, t1.Key, t3.ColId FROM KeyValue as t1 JOIN BigTable as t2 ON (t1.Key = t2.RowKey) JOIN Spanner as t3 ON (t3.ColId = t1.Key)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addStringField("t3.Value").addStringField("t2.Value").addStringField("t1.Value").addInt64Field("t1.Key").addInt64Field("t3.ColId").build()).addValues(new Object[]{"Spanner235", "BigTable235", "KeyValue235", 15L, 15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLTableJoinOnItselfWithFiltering() {
        String sql = "SELECT * FROM Spanner as t1 JOIN Spanner as t2 ON (t1.ColId = t2.ColId) WHERE t1.ColId = 17";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("field1").addStringField("field2").addInt64Field("field3").addStringField("field4").build()).addValues(new Object[]{17L, "Spanner237", 17L, "Spanner237"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLSelectFromSelect() {
        String sql = "SELECT * FROM (SELECT \"apple\" AS fruit, \"carrot\" AS vegetable);";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").addStringField("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"apple", "carrot"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
        Schema outputSchema = stream.getSchema();
        Assert.assertEquals((long)2L, (long)outputSchema.getFieldCount());
        Assert.assertEquals((Object)"fruit", (Object)outputSchema.getField(0).getName());
        Assert.assertEquals((Object)"vegetable", (Object)outputSchema.getField(1).getName());
    }

    @Test
    public void testZetaSQLSelectFromTable() {
        String sql = "SELECT Key, Value FROM KeyValue;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{14L, "KeyValue234"}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{15L, "KeyValue235"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLSelectFromTableLimit() {
        String sql = "SELECT Key, Value FROM KeyValue LIMIT 2;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{14L, "KeyValue234"}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{15L, "KeyValue235"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLSelectFromTableLimit0() {
        String sql = "SELECT Key, Value FROM KeyValue LIMIT 0;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[0]);
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLSelectNullLimitParam() {
        String sql = "SELECT Key, Value FROM KeyValue LIMIT @lmt;";
        ImmutableMap params = ImmutableMap.of((Object)"lmt", (Object)Value.createNullValue((Type)TypeFactory.createSimpleType((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_INT64)));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("Limit requires non-null count and offset");
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
    }

    @Test
    public void testZetaSQLSelectNullOffsetParam() {
        String sql = "SELECT Key, Value FROM KeyValue LIMIT 1 OFFSET @lmt;";
        ImmutableMap params = ImmutableMap.of((Object)"lmt", (Object)Value.createNullValue((Type)TypeFactory.createSimpleType((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_INT64)));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("Limit requires non-null count and offset");
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
    }

    @Test
    public void testZetaSQLSelectFromTableOrderLimit() {
        String sql = "SELECT x, y FROM (SELECT 1 as x, 0 as y UNION ALL SELECT 0, 0 UNION ALL SELECT 1, 0 UNION ALL SELECT 1, 1) ORDER BY x LIMIT 1";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{0L, 0L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLSelectFromTableLimitOffset() {
        String sql = "SELECT COUNT(a) FROM (\nSELECT a FROM (SELECT 1 a UNION ALL SELECT 2 UNION ALL SELECT 3) LIMIT 3 OFFSET 1);";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{2L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLSelectFromTableOrderByLimit() {
        String sql = "SELECT Key, Value FROM KeyValue ORDER BY Key DESC LIMIT 2;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{14L, "KeyValue234"}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{15L, "KeyValue235"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLSelectFromTableOrderBy() {
        String sql = "SELECT Key, Value FROM KeyValue ORDER BY Key DESC;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("ORDER BY without a LIMIT is not supported.");
        zetaSQLQueryPlanner.convertToBeamRel(sql);
    }

    @Test
    public void testZetaSQLSelectFromTableWithStructType2() {
        String sql = "SELECT table_with_struct.struct_col.struct_col_str FROM table_with_struct WHERE id = 1;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)"row_one").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLStructFieldAccessInFilter() {
        String sql = "SELECT table_with_struct.id FROM table_with_struct WHERE table_with_struct.struct_col.struct_col_str = 'row_one';";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)1L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLStructFieldAccessInCast() {
        String sql = "SELECT CAST(table_with_struct.id AS STRING) FROM table_with_struct WHERE table_with_struct.struct_col.struct_col_str = 'row_one';";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)"1").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="[BEAM-9191] CAST operator does not work fully due to bugs in unparsing")
    public void testZetaSQLStructFieldAccessInCast2() {
        String sql = "SELECT CAST(A.struct_col.struct_col_str AS TIMESTAMP) FROM table_with_struct_ts_string AS A";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addDateTimeField("field").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)DateTimeUtils.parseTimestampWithUTCTimeZone((String)"2019-01-15 13:21:03")).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testAggregateWithAndWithoutColumnRefs() {
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        String sql = "SELECT \n  id, \n  SUM(has_f1) as f1_count, \n  SUM(has_f2) as f2_count, \n  SUM(has_f3) as f3_count, \n  SUM(has_f4) as f4_count, \n  SUM(has_f5) as f5_count, \n  COUNT(*) as count, \n  SUM(has_f6) as f6_count  \nFROM (select 0 as id, 1 as has_f1, 2 as has_f2, 3 as has_f3, 4 as has_f4, 5 as has_f5, 6 as has_f6)\nGROUP BY id";
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("id").addInt64Field("f1_count").addInt64Field("f2_count").addInt64Field("f3_count").addInt64Field("f4_count").addInt64Field("f5_count").addInt64Field("count").addInt64Field("f6_count").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{0L, 1L, 2L, 3L, 4L, 5L, 1L, 6L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLStructFieldAccessInGroupBy() {
        String sql = "SELECT rowCol.row_id, COUNT(*) FROM table_with_struct_two GROUP BY rowCol.row_id";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1L, 1L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{2L, 1L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{3L, 2L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLAnyValueInGroupBy() {
        String sql = "SELECT rowCol.row_id as key, ANY_VALUE(rowCol.data) as any_value FROM table_with_struct_two GROUP BY rowCol.row_id";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        HashMap<Long, List<String>> allowedTuples = new HashMap<Long, List<String>>();
        allowedTuples.put(1L, Arrays.asList("data1"));
        allowedTuples.put(2L, Arrays.asList("data2"));
        allowedTuples.put(3L, Arrays.asList("data2", "data3"));
        PAssert.that((PCollection)stream).satisfies((SerializableFunction & Serializable)input -> {
            for (Row row : input) {
                List values = (List)allowedTuples.remove(row.getInt64("key"));
                Assert.assertTrue((values != null ? 1 : 0) != 0);
                Assert.assertTrue((boolean)values.contains(row.getString("any_value")));
            }
            Assert.assertTrue((boolean)allowedTuples.isEmpty());
            return null;
        });
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLStructFieldAccessInGroupBy2() {
        String sql = "SELECT rowCol.data, MAX(rowCol.row_id), MIN(rowCol.row_id) FROM table_with_struct_two GROUP BY rowCol.data";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").addInt64Field("field2").addInt64Field("field3").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"data1", 1L, 1L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{"data2", 3L, 2L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{"data3", 3L, 3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLStructFieldAccessInnerJoin() {
        String sql = "SELECT A.rowCol.data FROM table_with_struct_two AS A INNER JOIN table_with_struct AS B ON A.rowCol.row_id = B.id";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)"data1").build(), Row.withSchema((Schema)schema).addValue((Object)"data2").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLSelectFromTableWithArrayType() {
        String sql = "SELECT array_col FROM table_with_array;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addArrayField("field", Schema.FieldType.STRING).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue(Arrays.asList("1", "2", "3")).build(), Row.withSchema((Schema)schema).addValue((Object)ImmutableList.of()).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLSelectStarFromTable() {
        String sql = "SELECT * FROM BigTable;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").addDateTimeField("field3").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{15L, "BigTable235", new DateTime(2018, 7, 1, 21, 26, 7, (Chronology)ISOChronology.getInstanceUTC())}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{16L, "BigTable236", new DateTime(2018, 7, 1, 21, 26, 8, (Chronology)ISOChronology.getInstanceUTC())}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLBasicFiltering() {
        String sql = "SELECT Key, Value FROM KeyValue WHERE Key = 14;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("field1").addStringField("field2").build()).addValues(new Object[]{14L, "KeyValue234"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLBasicFilteringTwo() {
        String sql = "SELECT Key, Value FROM KeyValue WHERE Key = 14 AND Value = 'non-existing';";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[0]);
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLBasicFilteringThree() {
        String sql = "SELECT Key, Value FROM KeyValue WHERE Key = 14 OR Key = 15;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{14L, "KeyValue234"}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{15L, "KeyValue235"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLCountOnAColumn() {
        String sql = "SELECT COUNT(Key) FROM KeyValue";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{2L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLAggDistinct() {
        String sql = "SELECT Key, COUNT(DISTINCT Value) FROM KeyValue GROUP BY Key";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("Does not support COUNT DISTINCT");
        zetaSQLQueryPlanner.convertToBeamRel(sql);
    }

    @Test
    public void testZetaSQLBasicAgg() {
        String sql = "SELECT Key, COUNT(*) FROM KeyValue GROUP BY Key";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{14L, 1L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{15L, 1L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLColumnAlias1() {
        String sql = "SELECT Key, COUNT(*) AS count_col FROM KeyValue GROUP BY Key";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
        Schema outputSchema = stream.getSchema();
        Assert.assertEquals((long)2L, (long)outputSchema.getFieldCount());
        Assert.assertEquals((Object)"Key", (Object)outputSchema.getField(0).getName());
        Assert.assertEquals((Object)"count_col", (Object)outputSchema.getField(1).getName());
    }

    @Test
    public void testZetaSQLColumnAlias2() {
        String sql = "SELECT Key AS k1, (count_col + 1) AS k2 FROM (SELECT Key, COUNT(*) AS count_col FROM KeyValue GROUP BY Key)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
        Schema outputSchema = stream.getSchema();
        Assert.assertEquals((long)2L, (long)outputSchema.getFieldCount());
        Assert.assertEquals((Object)"k1", (Object)outputSchema.getField(0).getName());
        Assert.assertEquals((Object)"k2", (Object)outputSchema.getField(1).getName());
    }

    @Test
    public void testZetaSQLColumnAlias3() {
        String sql = "SELECT Key AS v1, Value AS v2, ts AS v3 FROM KeyValue";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
        Schema outputSchema = stream.getSchema();
        Assert.assertEquals((long)3L, (long)outputSchema.getFieldCount());
        Assert.assertEquals((Object)"v1", (Object)outputSchema.getField(0).getName());
        Assert.assertEquals((Object)"v2", (Object)outputSchema.getField(1).getName());
        Assert.assertEquals((Object)"v3", (Object)outputSchema.getField(2).getName());
    }

    @Test
    public void testZetaSQLColumnAlias4() {
        String sql = "SELECT CAST(123 AS INT64) AS cast_col";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
        Schema outputSchema = stream.getSchema();
        Assert.assertEquals((long)1L, (long)outputSchema.getFieldCount());
        Assert.assertEquals((Object)"cast_col", (Object)outputSchema.getField(0).getName());
    }

    @Test
    public void testZetaSQLAmbiguousAlias() {
        String sql = "SELECT row_id as ID, int64_col as ID FROM table_all_types GROUP BY ID;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expectMessage("Name ID in GROUP BY clause is ambiguous; it may refer to multiple columns in the SELECT-list [at 1:68]");
        zetaSQLQueryPlanner.convertToBeamRel(sql);
    }

    @Test
    public void testZetaSQLAggWithOrdinalReference() {
        String sql = "SELECT Key, COUNT(*) FROM aggregate_test_table GROUP BY 1";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1L, 2L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{2L, 3L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{3L, 2L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLAggWithAliasReference() {
        String sql = "SELECT Key AS K, COUNT(*) FROM aggregate_test_table GROUP BY K";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1L, 2L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{2L, 3L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{3L, 2L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLBasicAgg2() {
        String sql = "SELECT Key, COUNT(*) FROM aggregate_test_table GROUP BY Key";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1L, 2L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{2L, 3L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{3L, 2L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLBasicAgg3() {
        String sql = "SELECT Key, Key2, COUNT(*) FROM aggregate_test_table GROUP BY Key2, Key";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field3").addInt64Field("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1L, 10L, 1L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{1L, 11L, 1L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{2L, 11L, 2L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{2L, 12L, 1L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{3L, 13L, 2L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLBasicAgg4() {
        String sql = "SELECT Key, Key2, MAX(f_int_1), MIN(f_int_1), SUM(f_int_1), SUM(f_double_1) FROM aggregate_test_table GROUP BY Key2, Key";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field3").addInt64Field("field2").addInt64Field("field4").addInt64Field("field5").addDoubleField("field6").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1L, 10L, 1L, 1L, 1L, 1.0}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{1L, 11L, 2L, 2L, 2L, 2.0}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{2L, 11L, 4L, 3L, 7L, 7.0}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{2L, 12L, 5L, 5L, 5L, 5.0}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{3L, 13L, 7L, 6L, 13L, 13.0}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLBasicAgg5() {
        String sql = "SELECT Key, Key2, AVG(CAST(f_int_1 AS FLOAT64)), AVG(f_double_1) FROM aggregate_test_table GROUP BY Key2, Key";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").addDoubleField("field3").addDoubleField("field4").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1L, 10L, 1.0, 1.0}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{1L, 11L, 2.0, 2.0}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{2L, 11L, 3.5, 3.5}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{2L, 12L, 5.0, 5.0}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{3L, 13L, 6.5, 6.5}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="Calcite infers return type of AVG(int64) as BIGINT while ZetaSQL requires it as either NUMERIC or DOUBLE/FLOAT64")
    public void testZetaSQLTestAVG() {
        String sql = "SELECT Key, AVG(f_int_1)FROM aggregate_test_table GROUP BY Key";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").addInt64Field("field3").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1L, 10L, 1L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{1L, 11L, 6L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{2L, 11L, 6L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLGroupByExprInSelect() {
        String sql = "SELECT int64_col + 1 FROM table_all_types GROUP BY int64_col + 1;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)0L).build(), Row.withSchema((Schema)schema).addValue((Object)-1L).build(), Row.withSchema((Schema)schema).addValue((Object)-2L).build(), Row.withSchema((Schema)schema).addValue((Object)-3L).build(), Row.withSchema((Schema)schema).addValue((Object)-4L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLGroupByAndFiltering() {
        String sql = "SELECT int64_col FROM table_all_types WHERE int64_col = 1 GROUP BY int64_col;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[0]);
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLGroupByAndFilteringOnNonGroupByColumn() {
        String sql = "SELECT int64_col FROM table_all_types WHERE double_col = 0.5 GROUP BY int64_col;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)-5L).build(), Row.withSchema((Schema)schema).addValue((Object)-4L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLBasicHaving() {
        String sql = "SELECT Key, COUNT(*) FROM aggregate_test_table GROUP BY Key HAVING COUNT(*) > 2";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{2L, 3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLHavingNull() {
        String sql = "SELECT SUM(int64_val) FROM all_null_table GROUP BY primary_key HAVING false";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field").build();
        PAssert.that((PCollection)stream).empty();
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLBasicFixedWindowing() {
        String sql = "SELECT COUNT(*) as field_count, TUMBLE_START(\"INTERVAL 1 SECOND\") as window_start, TUMBLE_END(\"INTERVAL 1 SECOND\") as window_end FROM KeyValue GROUP BY TUMBLE(ts, \"INTERVAL 1 SECOND\");";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("count_start").addDateTimeField("field1").addDateTimeField("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1L, new DateTime(2018, 7, 1, 21, 26, 7, (Chronology)ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 8, (Chronology)ISOChronology.getInstanceUTC())}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{1L, new DateTime(2018, 7, 1, 21, 26, 6, (Chronology)ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 7, (Chronology)ISOChronology.getInstanceUTC())}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLNestedQueryOne() {
        String sql = "SELECT a.Value, a.Key FROM (SELECT Key, Value FROM KeyValue WHERE Key = 14 OR Key = 15) as a;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field2").addInt64Field("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"KeyValue234", 14L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{"KeyValue235", 15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLNestedQueryTwo() {
        String sql = "SELECT a.Key, a.Key2, COUNT(*) FROM  (SELECT * FROM aggregate_test_table WHERE Key != 10) as a  GROUP BY a.Key2, a.Key";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field3").addInt64Field("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1L, 10L, 1L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{1L, 11L, 1L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{2L, 11L, 2L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{2L, 12L, 1L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{3L, 13L, 2L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLNestedQueryThree() {
        String sql = "SELECT * FROM (SELECT * FROM KeyValue) AS t1 INNER JOIN (SELECT * FROM BigTable) AS t2 on t1.Key = t2.RowKey";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("Key").addStringField("Value").addDateTimeField("ts").addInt64Field("RowKey").addStringField("Value2").addDateTimeField("ts2").build()).addValues(new Object[]{15L, "KeyValue235", new DateTime(2018, 7, 1, 21, 26, 7, (Chronology)ISOChronology.getInstanceUTC()), 15L, "BigTable235", new DateTime(2018, 7, 1, 21, 26, 7, (Chronology)ISOChronology.getInstanceUTC())}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLNestedQueryFive() {
        String sql = "SELECT a.Value, a.Key FROM (SELECT Value, Key FROM KeyValue WHERE Key = 14 OR Key = 15) as a;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field2").addInt64Field("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"KeyValue234", 14L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{"KeyValue235", 15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testDateLiteral() {
        String sql = "SELECT DATE '2020-3-30'";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addLogicalTypeField("f_date", SqlTypes.DATE).build()).addValues(new Object[]{LocalDate.of(2020, 3, 30)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testDateColumn() {
        String sql = "SELECT FORMAT_DATE('%b-%d-%Y', date_field) FROM table_with_date";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addStringField("f_date_str").build()).addValues(new Object[]{"Dec-25-2008"}).build(), Row.withSchema((Schema)Schema.builder().addStringField("f_date_str").build()).addValues(new Object[]{"Apr-07-2020"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testExtractDate() {
        String sql = "WITH Dates AS (\n  SELECT DATE '2015-12-31' AS date UNION ALL\n  SELECT DATE '2016-01-01'\n)\nSELECT\n  EXTRACT(ISOYEAR FROM date) AS isoyear,\n  EXTRACT(YEAR FROM date) AS year,\n  EXTRACT(ISOWEEK FROM date) AS isoweek,\n  EXTRACT(MONTH FROM date) AS month\nFROM Dates\n";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addField("isoyear", Schema.FieldType.INT64).addField("year", Schema.FieldType.INT64).addField("isoweek", Schema.FieldType.INT64).addField("month", Schema.FieldType.INT64).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{2015L, 2015L, 53L, 12L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{2015L, 2016L, 53L, 1L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testDateFromYearMonthDay() {
        String sql = "SELECT DATE(2008, 12, 25)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addLogicalTypeField("f_date", SqlTypes.DATE).build()).addValues(new Object[]{LocalDate.of(2008, 12, 25)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testDateFromTimestamp() {
        String sql = "SELECT DATE(TIMESTAMP '2016-12-25 05:30:00+07', 'America/Los_Angeles')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addLogicalTypeField("f_date", SqlTypes.DATE).build()).addValues(new Object[]{LocalDate.of(2016, 12, 24)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testDateAdd() {
        String sql = "SELECT DATE_ADD(DATE '2008-12-25', INTERVAL 5 DAY), DATE_ADD(DATE '2008-12-25', INTERVAL 1 MONTH), DATE_ADD(DATE '2008-12-25', INTERVAL 1 YEAR), ";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addLogicalTypeField("f_date1", SqlTypes.DATE).addLogicalTypeField("f_date2", SqlTypes.DATE).addLogicalTypeField("f_date3", SqlTypes.DATE).build()).addValues(new Object[]{LocalDate.of(2008, 12, 30), LocalDate.of(2009, 1, 25), LocalDate.of(2009, 12, 25)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testDateSub() {
        String sql = "SELECT DATE_SUB(DATE '2008-12-25', INTERVAL 5 DAY), DATE_SUB(DATE '2008-12-25', INTERVAL 1 MONTH), DATE_SUB(DATE '2008-12-25', INTERVAL 1 YEAR), ";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addLogicalTypeField("f_date1", SqlTypes.DATE).addLogicalTypeField("f_date2", SqlTypes.DATE).addLogicalTypeField("f_date3", SqlTypes.DATE).build()).addValues(new Object[]{LocalDate.of(2008, 12, 20), LocalDate.of(2008, 11, 25), LocalDate.of(2007, 12, 25)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testDateDiff() {
        String sql = "SELECT DATE_DIFF(DATE '2010-07-07', DATE '2008-12-25', DAY)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("f_date_diff").build()).addValues(new Object[]{559L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testDateDiffNegativeResult() {
        String sql = "SELECT DATE_DIFF(DATE '2017-12-17', DATE '2017-12-18', ISOWEEK)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("f_date_diff").build()).addValues(new Object[]{-1L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testDateTrunc() {
        String sql = "SELECT DATE_TRUNC(DATE '2015-06-15', ISOYEAR)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addLogicalTypeField("f_date_trunc", SqlTypes.DATE).build()).addValues(new Object[]{LocalDate.of(2014, 12, 29)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testFormatDate() {
        String sql = "SELECT FORMAT_DATE('%b-%d-%Y', DATE '2008-12-25')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addStringField("f_date_str").build()).addValues(new Object[]{"Dec-25-2008"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testParseDate() {
        String sql = "SELECT PARSE_DATE('%m %d %y', '10 14 18')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addLogicalTypeField("f_date", SqlTypes.DATE).build()).addValues(new Object[]{LocalDate.of(2018, 10, 14)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testDateToUnixInt64() {
        String sql = "SELECT UNIX_DATE(DATE '2008-12-25')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("f_unix_date").build()).addValues(new Object[]{14238L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testDateFromUnixInt64() {
        String sql = "SELECT DATE_FROM_UNIX_DATE(14238)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addLogicalTypeField("f_date", SqlTypes.DATE).build()).addValues(new Object[]{LocalDate.of(2008, 12, 25)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimeLiteral() {
        String sql = "SELECT TIME '15:30:00', TIME '15:30:00.135246' ";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addLogicalTypeField("f_time1", SqlTypes.TIME).addLogicalTypeField("f_time2", SqlTypes.TIME).build()).addValues(new Object[]{LocalTime.of(15, 30, 0)}).addValues(new Object[]{LocalTime.of(15, 30, 0, 135246000)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimeColumn() {
        String sql = "SELECT FORMAT_TIME('%T', time_field) FROM table_with_time";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addStringField("f_time_str").build()).addValues(new Object[]{"15:30:00"}).build(), Row.withSchema((Schema)Schema.builder().addStringField("f_time_str").build()).addValues(new Object[]{"23:35:59"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testExtractTime() {
        String sql = "SELECT EXTRACT(HOUR FROM TIME '15:30:35.123456') as hour, EXTRACT(MINUTE FROM TIME '15:30:35.123456') as minute, EXTRACT(SECOND FROM TIME '15:30:35.123456') as second, EXTRACT(MILLISECOND FROM TIME '15:30:35.123456') as millisecond, EXTRACT(MICROSECOND FROM TIME '15:30:35.123456') as microsecond ";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addField("hour", Schema.FieldType.INT64).addField("minute", Schema.FieldType.INT64).addField("second", Schema.FieldType.INT64).addField("millisecond", Schema.FieldType.INT64).addField("microsecond", Schema.FieldType.INT64).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{15L, 30L, 35L, 123L, 123456L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimeFromHourMinuteSecond() {
        String sql = "SELECT TIME(15, 30, 0)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addLogicalTypeField("f_time", SqlTypes.TIME).build()).addValues(new Object[]{LocalTime.of(15, 30, 0)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimeFromTimestamp() {
        String sql = "SELECT TIME(TIMESTAMP '2008-12-25 15:30:00+08', 'America/Los_Angeles')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addLogicalTypeField("f_time", SqlTypes.TIME).build()).addValues(new Object[]{LocalTime.of(23, 30, 0)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimeAdd() {
        String sql = "SELECT TIME_ADD(TIME '15:30:00', INTERVAL 10 MICROSECOND), TIME_ADD(TIME '15:30:00', INTERVAL 10 MILLISECOND), TIME_ADD(TIME '15:30:00', INTERVAL 10 SECOND), TIME_ADD(TIME '15:30:00', INTERVAL 10 MINUTE), TIME_ADD(TIME '15:30:00', INTERVAL 10 HOUR) ";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addLogicalTypeField("f_time1", SqlTypes.TIME).addLogicalTypeField("f_time2", SqlTypes.TIME).addLogicalTypeField("f_time3", SqlTypes.TIME).addLogicalTypeField("f_time4", SqlTypes.TIME).addLogicalTypeField("f_time5", SqlTypes.TIME).build()).addValues(new Object[]{LocalTime.of(15, 30, 0, 10000), LocalTime.of(15, 30, 0, 10000000), LocalTime.of(15, 30, 10, 0), LocalTime.of(15, 40, 0, 0), LocalTime.of(1, 30, 0, 0)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimeSub() {
        String sql = "SELECT TIME_SUB(TIME '15:30:00', INTERVAL 10 MICROSECOND), TIME_SUB(TIME '15:30:00', INTERVAL 10 MILLISECOND), TIME_SUB(TIME '15:30:00', INTERVAL 10 SECOND), TIME_SUB(TIME '15:30:00', INTERVAL 10 MINUTE), TIME_SUB(TIME '15:30:00', INTERVAL 10 HOUR) ";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addLogicalTypeField("f_time1", SqlTypes.TIME).addLogicalTypeField("f_time2", SqlTypes.TIME).addLogicalTypeField("f_time3", SqlTypes.TIME).addLogicalTypeField("f_time4", SqlTypes.TIME).addLogicalTypeField("f_time5", SqlTypes.TIME).build()).addValues(new Object[]{LocalTime.of(15, 29, 59, 999990000), LocalTime.of(15, 29, 59, 990000000), LocalTime.of(15, 29, 50, 0), LocalTime.of(15, 20, 0, 0), LocalTime.of(5, 30, 0, 0)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimeDiff() {
        String sql = "SELECT TIME_DIFF(TIME '15:30:00', TIME '14:35:00', MINUTE)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("f_time_diff").build()).addValues(new Object[]{55L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimeDiffNegativeResult() {
        String sql = "SELECT TIME_DIFF(TIME '14:35:00', TIME '15:30:00', MINUTE)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("f_time_diff").build()).addValues(new Object[]{-55L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimeTrunc() {
        String sql = "SELECT TIME_TRUNC(TIME '15:30:35', HOUR)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addLogicalTypeField("f_time_trunc", SqlTypes.TIME).build()).addValues(new Object[]{LocalTime.of(15, 0, 0)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testFormatTime() {
        String sql = "SELECT FORMAT_TIME('%R', TIME '15:30:00')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addStringField("f_time_str").build()).addValues(new Object[]{"15:30"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testParseTime() {
        String sql = "SELECT PARSE_TIME('%H', '15')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addLogicalTypeField("f_time", SqlTypes.TIME).build()).addValues(new Object[]{LocalTime.of(15, 0, 0)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="https://jira.apache.org/jira/browse/BEAM-10340")
    public void testCastBetweenTimeAndString() {
        String sql = "SELECT CAST(s1 as TIME) as t2, CAST(t1 as STRING) as s2 FROM (SELECT '12:34:56.123456' as s1, TIME '12:34:56.123456' as t1)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addLogicalTypeField("t2", SqlTypes.TIME).addStringField("s2").build()).addValues(new Object[]{LocalTime.of(12, 34, 56, 123456000), "12:34:56.123456"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimestampMicrosecondUnsupported() {
        String sql = "WITH Timestamps AS (\n  SELECT TIMESTAMP '2000-01-01 00:11:22.345678+00' as timestamp\n)\nSELECT\n  timestamp,\n  EXTRACT(ISOYEAR FROM timestamp) AS isoyear,\n  EXTRACT(YEAR FROM timestamp) AS year,\n  EXTRACT(ISOWEEK FROM timestamp) AS week,\n  EXTRACT(MINUTE FROM timestamp) AS minute\nFROM Timestamps\n";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(UnsupportedOperationException.class);
        zetaSQLQueryPlanner.convertToBeamRel(sql);
    }

    @Test
    public void testTimestampLiteralWithoutTimeZone() {
        String sql = "SELECT TIMESTAMP '2016-12-25 05:30:00'";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addDateTimeField("field1").build()).addValues(new Object[]{DateTimeUtils.parseTimestampWithUTCTimeZone((String)"2016-12-25 05:30:00")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimestampLiteralWithUTCTimeZone() {
        String sql = "SELECT TIMESTAMP '2016-12-25 05:30:00+00'";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addDateTimeField("field1").build()).addValues(new Object[]{DateTimeUtils.parseTimestampWithUTCTimeZone((String)"2016-12-25 05:30:00")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testSelectNullIntersectDistinct() {
        String sql = "SELECT NULL INTERSECT DISTINCT SELECT 2";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        System.err.println("SCHEMA " + stream.getSchema());
        PAssert.that((PCollection)stream).empty();
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testSelectNullIntersectAll() {
        String sql = "SELECT NULL INTERSECT ALL SELECT 2";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        System.err.println("SCHEMA " + stream.getSchema());
        PAssert.that((PCollection)stream).empty();
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testSelectNullExceptDistinct() {
        String sql = "SELECT NULL EXCEPT DISTINCT SELECT 2";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.nullRow((Schema)stream.getSchema())});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testSelectNullExceptAll() {
        String sql = "SELECT NULL EXCEPT ALL SELECT 2";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.nullRow((Schema)stream.getSchema())});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testMultipleSelectStatementsThrowsException() {
        String sql = "SELECT 1; SELECT 2;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage("No additional statements are allowed after a SELECT statement.");
        zetaSQLQueryPlanner.convertToBeamRel(sql);
    }

    @Test
    public void testAlreadyDefinedUDFThrowsException() {
        String sql = "CREATE FUNCTION foo() AS (0); CREATE FUNCTION foo() AS (1); SELECT foo();";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(ParseException.class);
        this.thrown.expectMessage("Failed to define function foo");
        zetaSQLQueryPlanner.convertToBeamRel(sql);
    }

    @Test
    public void testCreateFunctionNoSelectThrowsException() {
        String sql = "CREATE FUNCTION plusOne(x INT64) AS (x + 1);";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage("Statement list must end in a SELECT statement, not CreateFunctionStmt");
        zetaSQLQueryPlanner.convertToBeamRel(sql);
    }

    @Test
    public void testNullaryUdf() {
        String sql = "CREATE FUNCTION zero() AS (0); SELECT zero();";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("x").build()).addValue((Object)0L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testQualifiedNameUdfUnqualifiedCall() {
        String sql = "CREATE FUNCTION foo.bar.baz() AS (\"uwu\"); SELECT baz();";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addStringField("x").build()).addValue((Object)"uwu").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="Qualified paths can't be resolved due to a bug in ZetaSQL: https://github.com/google/zetasql/issues/42")
    public void testQualifiedNameUdfQualifiedCallThrowsException() {
        String sql = "CREATE FUNCTION foo.bar.baz() AS (\"uwu\"); SELECT foo.bar.baz();";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addStringField("x").build()).addValue((Object)"uwu").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testUnaryUdf() {
        String sql = "CREATE FUNCTION triple(x INT64) AS (3 * x); SELECT triple(triple(1));";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("x").build()).addValue((Object)9L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testUdfWithinUdf() {
        String sql = "CREATE FUNCTION triple(x INT64) AS (3 * x); CREATE FUNCTION nonuple(x INT64) as (triple(triple(x))); SELECT nonuple(1);";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("x").build()).addValue((Object)9L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testUndefinedUdfThrowsException() {
        String sql = "CREATE FUNCTION foo() AS (bar()); CREATE FUNCTION bar() AS (foo()); SELECT foo();";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(SqlException.class);
        this.thrown.expectMessage("Function not found: bar");
        zetaSQLQueryPlanner.convertToBeamRel(sql);
    }

    @Test
    public void testRecursiveUdfThrowsException() {
        String sql = "CREATE FUNCTION omega() AS (omega()); SELECT omega();";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(SqlException.class);
        this.thrown.expectMessage("Function not found: omega");
        zetaSQLQueryPlanner.convertToBeamRel(sql);
    }

    @Test
    public void testTimestampLiteralWithNonUTCTimeZone() {
        String sql = "SELECT TIMESTAMP '2018-12-10 10:38:59-10:00'";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addDateTimeField("f_timestamp_with_time_zone").build()).addValues(new Object[]{DateTimeUtils.parseTimestampWithTimeZone((String)"2018-12-10 10:38:59-1000")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testExtractTimestamp() {
        String sql = "WITH Timestamps AS (\n  SELECT TIMESTAMP '2007-12-31 12:34:56' AS timestamp UNION ALL\n  SELECT TIMESTAMP '2009-12-31'\n)\nSELECT\n  EXTRACT(ISOYEAR FROM timestamp) AS isoyear,\n  EXTRACT(YEAR FROM timestamp) AS year,\n  EXTRACT(ISOWEEK FROM timestamp) AS isoweek,\n  EXTRACT(MINUTE FROM timestamp) AS minute\nFROM Timestamps\n";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addField("isoyear", Schema.FieldType.INT64).addField("year", Schema.FieldType.INT64).addField("isoweek", Schema.FieldType.INT64).addField("minute", Schema.FieldType.INT64).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{2008L, 2007L, 1L, 34L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{2009L, 2009L, 53L, 0L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testExtractTimestampAtTimeZoneUnsupported() {
        String sql = "WITH Timestamps AS (\n  SELECT TIMESTAMP '2017-05-26' AS timestamp\n)\nSELECT\n  timestamp,\n  EXTRACT(HOUR FROM timestamp AT TIME ZONE 'America/Vancouver') AS hour,\n  EXTRACT(DAY FROM timestamp AT TIME ZONE 'America/Vancouver') AS day\nFROM Timestamps\n";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(UnsupportedOperationException.class);
        zetaSQLQueryPlanner.convertToBeamRel(sql);
    }

    @Test
    public void testExtractDateFromTimestampUnsupported() {
        String sql = "WITH Timestamps AS (\n  SELECT TIMESTAMP '2017-05-26' AS ts\n)\nSELECT\n  ts,\n  EXTRACT(DATE FROM ts) AS dt\nFROM Timestamps\n";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(SqlException.class);
        zetaSQLQueryPlanner.convertToBeamRel(sql);
    }

    @Test
    public void testStringFromTimestamp() {
        String sql = "SELECT STRING(TIMESTAMP '2008-12-25 15:30:00', 'America/Los_Angeles')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addStringField("f_timestamp_string").build()).addValues(new Object[]{"2008-12-25 07:30:00-08"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimestampFromString() {
        String sql = "SELECT TIMESTAMP('2008-12-25 15:30:00', 'America/Los_Angeles')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addDateTimeField("f_timestamp").build()).addValues(new Object[]{DateTimeUtils.parseTimestampWithTimeZone((String)"2008-12-25 15:30:00-08")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimestampAdd() {
        String sql = "SELECT TIMESTAMP_ADD(TIMESTAMP '2008-12-25 15:30:00 UTC', INTERVAL 5+5 MINUTE), TIMESTAMP_ADD(TIMESTAMP '2008-12-25 15:30:00+07:30', INTERVAL 10 MINUTE)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addDateTimeField("f_timestamp_add").addDateTimeField("f_timestamp_with_time_zone_add").build()).addValues(new Object[]{DateTimeUtils.parseTimestampWithUTCTimeZone((String)"2008-12-25 15:40:00"), DateTimeUtils.parseTimestampWithTimeZone((String)"2008-12-25 15:40:00+0730")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimestampSub() {
        String sql = "SELECT TIMESTAMP_SUB(TIMESTAMP '2008-12-25 15:30:00 UTC', INTERVAL 5+5 MINUTE), TIMESTAMP_SUB(TIMESTAMP '2008-12-25 15:30:00+07:30', INTERVAL 10 MINUTE)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addDateTimeField("f_timestamp_sub").addDateTimeField("f_timestamp_with_time_zone_sub").build()).addValues(new Object[]{DateTimeUtils.parseTimestampWithUTCTimeZone((String)"2008-12-25 15:20:00"), DateTimeUtils.parseTimestampWithTimeZone((String)"2008-12-25 15:20:00+0730")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimestampDiff() {
        String sql = "SELECT TIMESTAMP_DIFF(TIMESTAMP '2018-10-14 15:30:00.000 UTC', TIMESTAMP '2018-08-14 15:05:00.001 UTC', MILLISECOND)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("f_timestamp_diff").build()).addValues(new Object[]{5271899999L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimestampDiffNegativeResult() {
        String sql = "SELECT TIMESTAMP_DIFF(TIMESTAMP '2018-08-14', TIMESTAMP '2018-10-14', DAY)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("f_timestamp_diff").build()).addValues(new Object[]{-61L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimestampTrunc() {
        String sql = "SELECT TIMESTAMP_TRUNC(TIMESTAMP '2017-11-06 00:00:00+12', ISOWEEK, 'UTC')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addDateTimeField("f_timestamp_trunc").build()).addValues(new Object[]{DateTimeUtils.parseTimestampWithUTCTimeZone((String)"2017-10-30 00:00:00")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testFormatTimestamp() {
        String sql = "SELECT FORMAT_TIMESTAMP('%D %T', TIMESTAMP '2018-10-14 15:30:00.123+00', 'UTC')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addStringField("f_timestamp_str").build()).addValues(new Object[]{"10/14/18 15:30:00"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testParseTimestamp() {
        String sql = "SELECT PARSE_TIMESTAMP('%m-%d-%y %T', '10-14-18 15:30:00', 'UTC')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addDateTimeField("f_timestamp").build()).addValues(new Object[]{DateTimeUtils.parseTimestampWithUTCTimeZone((String)"2018-10-14 15:30:00")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimestampFromInt64() {
        String sql = "SELECT TIMESTAMP_SECONDS(1230219000), TIMESTAMP_MILLIS(1230219000123) ";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addDateTimeField("f_timestamp_seconds").addDateTimeField("f_timestamp_millis").build()).addValues(new Object[]{DateTimeUtils.parseTimestampWithUTCTimeZone((String)"2008-12-25 15:30:00"), DateTimeUtils.parseTimestampWithUTCTimeZone((String)"2008-12-25 15:30:00.123")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimestampToUnixInt64() {
        String sql = "SELECT UNIX_SECONDS(TIMESTAMP '2008-12-25 15:30:00 UTC'), UNIX_MILLIS(TIMESTAMP '2008-12-25 15:30:00.123 UTC')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("f_unix_seconds").addInt64Field("f_unix_millis").build()).addValues(new Object[]{1230219000L, 1230219000123L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimestampFromUnixInt64() {
        String sql = "SELECT TIMESTAMP_FROM_UNIX_SECONDS(1230219000), TIMESTAMP_FROM_UNIX_MILLIS(1230219000123) ";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addDateTimeField("f_timestamp_seconds").addDateTimeField("f_timestamp_millis").build()).addValues(new Object[]{DateTimeUtils.parseTimestampWithUTCTimeZone((String)"2008-12-25 15:30:00"), DateTimeUtils.parseTimestampWithUTCTimeZone((String)"2008-12-25 15:30:00.123")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testDistinct() {
        String sql = "SELECT DISTINCT Key2 FROM aggregate_test_table";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("Key2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{10L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{11L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{12L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{13L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testDistinctOnNull() {
        String sql = "SELECT DISTINCT str_val FROM all_null_table";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("str_val", Schema.FieldType.DOUBLE).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testAnyValue() {
        String sql = "SELECT ANY_VALUE(double_val) FROM all_null_table";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("double_val", Schema.FieldType.DOUBLE).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testSelectNULL() {
        String sql = "SELECT NULL";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("long_val", Schema.FieldType.INT64).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testWithQueryOne() {
        String sql = "With T1 AS (SELECT * FROM KeyValue), T2 AS (SELECT * FROM BigTable) SELECT T2.RowKey FROM T1 INNER JOIN T2 on T1.Key = T2.RowKey;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testWithQueryTwo() {
        String sql = "WITH T1 AS (SELECT Key, COUNT(*) as value FROM KeyValue GROUP BY Key) SELECT T1.Key, T1.value FROM T1";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{14L, 1L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{15L, 1L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testWithQueryThree() {
        String sql = "WITH T1 as (SELECT Value, Key FROM KeyValue WHERE Key = 14 OR Key = 15) SELECT T1.Value, T1.Key FROM T1;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").addInt64Field("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"KeyValue234", 14L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{"KeyValue235", 15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testWithQueryFour() {
        String sql = "WITH T1 as (SELECT Value, Key FROM KeyValue) SELECT T1.Value, T1.Key FROM T1 WHERE T1.Key = 14 OR T1.Key = 15;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field2").addInt64Field("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"KeyValue234", 14L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{"KeyValue235", 15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testWithQueryFive() {
        String sql = "WITH T1 AS (SELECT * FROM KeyValue) SELECT T1.Key, COUNT(*) FROM T1 GROUP BY T1.Key";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{14L, 1L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{15L, 1L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testWithQuerySix() {
        String sql = "WITH T1 AS (SELECT * FROM window_test_table_two) SELECT COUNT(*) as field_count, SESSION_START(\"INTERVAL 3 SECOND\") as window_start, SESSION_END(\"INTERVAL 3 SECOND\") as window_end FROM T1 GROUP BY SESSION(ts, \"INTERVAL 3 SECOND\");";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("count_star").addDateTimeField("field1").addDateTimeField("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{2L, new DateTime(2018, 7, 1, 21, 26, 12, (Chronology)ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 12, (Chronology)ISOChronology.getInstanceUTC())}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{2L, new DateTime(2018, 7, 1, 21, 26, 6, (Chronology)ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 6, (Chronology)ISOChronology.getInstanceUTC())}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testUNNESTLiteral() {
        String sql = "SELECT * FROM UNNEST(ARRAY<STRING>['foo', 'bar']);";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("str_field").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"foo"}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{"bar"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testUNNESTParameters() {
        String sql = "SELECT * FROM UNNEST(@p0);";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createArrayValue((ArrayType)TypeFactory.createArrayType((Type)TypeFactory.createSimpleType((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING)), (Collection)ImmutableList.of((Object)Value.createStringValue((String)"foo"), (Object)Value.createStringValue((String)"bar"))));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("str_field").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"foo"}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{"bar"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="BEAM-9515")
    public void testUNNESTExpression() {
        String sql = "SELECT * FROM UNNEST(ARRAY(SELECT Value FROM KeyValue));";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("str_field").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"KeyValue234"}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{"KeyValue235"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testNamedUNNESTLiteral() {
        String sql = "SELECT *, T1 FROM UNNEST(ARRAY<STRING>['foo', 'bar']) AS T1";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("str_field").addStringField("str2_field").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"foo", "foo"}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{"bar", "bar"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testNamedUNNESTLiteralOffset() {
        String sql = "SELECT x, p FROM UNNEST([3, 4]) AS x WITH OFFSET p";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        this.thrown.expect(UnsupportedOperationException.class);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
    }

    @Test
    public void testUnnestArrayColumn() {
        String sql = "SELECT p FROM table_with_array_for_unnest, UNNEST(table_with_array_for_unnest.int_array_col) as p";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("int_field").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)14L).build(), Row.withSchema((Schema)schema).addValue((Object)18L).build(), Row.withSchema((Schema)schema).addValue((Object)22L).build(), Row.withSchema((Schema)schema).addValue((Object)24L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testStringAggregation() {
        String sql = "SELECT STRING_AGG(fruit) AS string_agg FROM UNNEST([\"apple\", \"pear\", \"banana\", \"pear\"]) AS fruit";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("string_field").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)"apple,pear,banana,pear").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="Seeing exception in Beam, need further investigation on the cause of this failed query.")
    public void testNamedUNNESTJoin() {
        String sql = "SELECT * FROM table_with_array_for_unnest AS t1 LEFT JOIN UNNEST(t1.int_array_col) AS t2 on  t1.int_col = t2";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[0]);
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testUnnestJoinStruct() {
        String sql = "SELECT b, x FROM UNNEST([STRUCT(true AS b, [3, 5] AS arr), STRUCT(false AS b, [7, 9] AS arr)]) t LEFT JOIN UNNEST(t.arr) x ON b";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(UnsupportedOperationException.class);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
    }

    @Test
    public void testUnnestJoinLiteral() {
        String sql = "SELECT a, b FROM UNNEST([1, 1, 2, 3, 5, 8, 13, NULL]) a JOIN UNNEST([1, 2, 3, 5, 7, 11, 13, NULL]) b ON a = b";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(UnsupportedOperationException.class);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
    }

    @Test
    public void testUnnestJoinSubquery() {
        String sql = "SELECT a, b FROM UNNEST([1, 2, 3]) a JOIN UNNEST(ARRAY(SELECT b FROM UNNEST([3, 2, 1]) b)) b ON a = b";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(UnsupportedOperationException.class);
        zetaSQLQueryPlanner.convertToBeamRel(sql);
    }

    @Test
    public void testCaseNoValue() {
        String sql = "SELECT CASE WHEN 1 > 2 THEN 'not possible' ELSE 'seems right' END";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addStringField("str_field").build()).addValue((Object)"seems right").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testCaseWithValue() {
        String sql = "SELECT CASE 1 WHEN 2 THEN 'not possible' ELSE 'seems right' END";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addStringField("str_field").build()).addValue((Object)"seems right").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testCaseWithValueMultipleCases() {
        String sql = "SELECT CASE 2 WHEN 1 THEN 'not possible' WHEN 2 THEN 'seems right' ELSE 'also not possible' END";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addStringField("str_field").build()).addValue((Object)"seems right").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testCaseWithValueNoElse() {
        String sql = "SELECT CASE 2 WHEN 1 THEN 'not possible' WHEN 2 THEN 'seems right' END";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addStringField("str_field").build()).addValue((Object)"seems right").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testCaseNoValueNoElseNoMatch() {
        String sql = "SELECT CASE WHEN 'abc' = '123' THEN 'not possible' END";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addNullableField("str_field", Schema.FieldType.STRING).build()).addValue(null).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testCaseWithValueNoElseNoMatch() {
        String sql = "SELECT CASE 2 WHEN 1 THEN 'not possible' END";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addNullableField("str_field", Schema.FieldType.STRING).build()).addValue(null).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testCastToDateWithCase() {
        String sql = "SELECT f_int, \nCASE WHEN CHAR_LENGTH(TRIM(f_string)) = 8 \n    THEN CAST (CONCAT(\n       SUBSTR(TRIM(f_string), 1, 4) \n        , '-' \n        , SUBSTR(TRIM(f_string), 5, 2) \n        , '-' \n        , SUBSTR(TRIM(f_string), 7, 2)) AS DATE)\n    ELSE NULL\nEND \nFROM table_for_case_when";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema resultType = Schema.builder().addInt64Field("f_long").addNullableField("f_date", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.DATE)).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)resultType).addValues(new Object[]{1L, LocalDate.parse("2018-10-18")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testIntersectAll() {
        String sql = "SELECT Key FROM aggregate_test_table INTERSECT ALL SELECT Key FROM aggregate_test_table_two";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema resultType = Schema.builder().addInt64Field("field").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)resultType).addValues(new Object[]{1L}).build(), Row.withSchema((Schema)resultType).addValues(new Object[]{2L}).build(), Row.withSchema((Schema)resultType).addValues(new Object[]{2L}).build(), Row.withSchema((Schema)resultType).addValues(new Object[]{2L}).build(), Row.withSchema((Schema)resultType).addValues(new Object[]{3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testIntersectDistinct() {
        String sql = "SELECT Key FROM aggregate_test_table INTERSECT DISTINCT SELECT Key FROM aggregate_test_table_two";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema resultType = Schema.builder().addInt64Field("field").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)resultType).addValues(new Object[]{1L}).build(), Row.withSchema((Schema)resultType).addValues(new Object[]{2L}).build(), Row.withSchema((Schema)resultType).addValues(new Object[]{3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testExceptAll() {
        String sql = "SELECT Key FROM aggregate_test_table EXCEPT ALL SELECT Key FROM aggregate_test_table_two";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema resultType = Schema.builder().addInt64Field("field").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)resultType).addValues(new Object[]{1L}).build(), Row.withSchema((Schema)resultType).addValues(new Object[]{3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testSelectFromEmptyTable() {
        String sql = "SELECT * FROM table_empty;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[0]);
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testStartsWithString() {
        String sql = "SELECT STARTS_WITH('string1', 'stri')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{true}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testStartsWithString2() {
        String sql = "SELECT STARTS_WITH(@p0, @p1)";
        ImmutableMap params = ImmutableMap.builder().put((Object)"p0", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING)).put((Object)"p1", (Object)Value.createStringValue((String)"")).build();
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testStartsWithString3() {
        String sql = "SELECT STARTS_WITH(@p0, @p1)";
        ImmutableMap params = ImmutableMap.builder().put((Object)"p0", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING)).put((Object)"p1", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING)).build();
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testEndsWithString() {
        String sql = "SELECT STARTS_WITH('string1', 'ng0')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{false}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testEndsWithString2() {
        String sql = "SELECT STARTS_WITH(@p0, @p1)";
        ImmutableMap params = ImmutableMap.builder().put((Object)"p0", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING)).put((Object)"p1", (Object)Value.createStringValue((String)"")).build();
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testEndsWithString3() {
        String sql = "SELECT STARTS_WITH(@p0, @p1)";
        ImmutableMap params = ImmutableMap.builder().put((Object)"p0", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING)).put((Object)"p1", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING)).build();
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="Does not support DateTime literal.")
    public void testDateTimeLiteral() {
        String sql = "SELECT DATETIME '2018-01-01 05:30:00.334'";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("Unsupported ResolvedLiteral type: DATETIME");
        zetaSQLQueryPlanner.convertToBeamRel(sql);
    }

    @Test
    public void testConcatWithOneParameters() {
        String sql = "SELECT concat('abc')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"abc"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testConcatWithTwoParameters() {
        String sql = "SELECT concat('abc', 'def')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"abcdef"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testConcatWithThreeParameters() {
        String sql = "SELECT concat('abc', 'def', 'xyz')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"abcdefxyz"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testConcatWithFourParameters() {
        String sql = "SELECT concat('abc', 'def', '  ', 'xyz')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"abcdef  xyz"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testConcatWithFiveParameters() {
        String sql = "SELECT concat('abc', 'def', '  ', 'xyz', 'kkk')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"abcdef  xyzkkk"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testConcatWithSixParameters() {
        String sql = "SELECT concat('abc', 'def', '  ', 'xyz', 'kkk', 'ttt')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"abcdef  xyzkkkttt"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testConcatWithNull1() {
        String sql = "SELECT CONCAT(@p0, @p1) AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)""), (Object)"p1", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testConcatWithNull2() {
        String sql = "SELECT CONCAT(@p0, @p1) AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING), (Object)"p1", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testNamedParameterQuery() {
        String sql = "SELECT @ColA AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"ColA", (Object)Value.createInt64Value((long)5L));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{5L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testArrayStructLiteral() {
        String sql = "SELECT ARRAY<STRUCT<INT64, INT64>>[(11, 12)];";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema innerSchema = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"s", (Schema.FieldType)Schema.FieldType.INT64), Schema.Field.of((String)"i", (Schema.FieldType)Schema.FieldType.INT64)});
        Schema schema = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"field1", (Schema.FieldType)Schema.FieldType.array((Schema.FieldType)Schema.FieldType.row((Schema)innerSchema)))});
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)ImmutableList.of((Object)Row.withSchema((Schema)innerSchema).addValues(new Object[]{11L, 12L}).build())).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testParameterStruct() {
        String sql = "SELECT @p as ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p", (Object)Value.createStructValue((StructType)TypeFactory.createStructType((Collection)ImmutableList.of((Object)new StructType.StructField("s", (Type)TypeFactory.createSimpleType((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING)), (Object)new StructType.StructField("i", (Type)TypeFactory.createSimpleType((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_INT64)))), (Collection)ImmutableList.of((Object)Value.createStringValue((String)"foo"), (Object)Value.createInt64Value((long)1L))));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema innerSchema = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"s", (Schema.FieldType)Schema.FieldType.STRING), Schema.Field.of((String)"i", (Schema.FieldType)Schema.FieldType.INT64)});
        Schema schema = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"field1", (Schema.FieldType)Schema.FieldType.row((Schema)innerSchema))});
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)Row.withSchema((Schema)innerSchema).addValues(new Object[]{"foo", 1L}).build()).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testParameterStructNested() {
        String sql = "SELECT @outer_struct.inner_struct.s as ColA";
        StructType innerStructType = TypeFactory.createStructType((Collection)ImmutableList.of((Object)new StructType.StructField("s", (Type)TypeFactory.createSimpleType((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING))));
        ImmutableMap params = ImmutableMap.of((Object)"outer_struct", (Object)Value.createStructValue((StructType)TypeFactory.createStructType((Collection)ImmutableList.of((Object)new StructType.StructField("inner_struct", (Type)innerStructType))), (Collection)ImmutableList.of((Object)Value.createStructValue((StructType)innerStructType, (Collection)ImmutableList.of((Object)Value.createStringValue((String)"foo"))))));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)"foo").build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testConcatNamedParameterQuery() {
        String sql = "SELECT CONCAT(@p0, @p1) AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)""), (Object)"p1", (Object)Value.createStringValue((String)"A"));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"A"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testConcatPositionalParameterQuery() {
        String sql = "SELECT CONCAT(?, ?, ?) AS ColA";
        ImmutableList params = ImmutableList.of((Object)Value.createStringValue((String)"a"), (Object)Value.createStringValue((String)"b"), (Object)Value.createStringValue((String)"c"));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (List)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"abc"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testReplace1() {
        String sql = "SELECT REPLACE(@p0, @p1, @p2) AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)""), (Object)"p1", (Object)Value.createStringValue((String)""), (Object)"p2", (Object)Value.createStringValue((String)"a"));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{""}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testReplace2() {
        String sql = "SELECT REPLACE(@p0, @p1, @p2) AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)"abc"), (Object)"p1", (Object)Value.createStringValue((String)""), (Object)"p2", (Object)Value.createStringValue((String)"xyz"));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"abc"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testReplace3() {
        String sql = "SELECT REPLACE(@p0, @p1, @p2) AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)""), (Object)"p1", (Object)Value.createStringValue((String)""), (Object)"p2", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testReplace4() {
        String sql = "SELECT REPLACE(@p0, @p1, @p2) AS ColA";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING), (Object)"p1", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING), (Object)"p2", (Object)Value.createStringValue((String)""));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTrim1() {
        String sql = "SELECT trim(@p0)";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)"   a b c   "));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"a b c"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTrim2() {
        String sql = "SELECT trim(@p0, @p1)";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)"abxyzab"), (Object)"p1", (Object)Value.createStringValue((String)"ab"));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"xyz"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTrim3() {
        String sql = "SELECT trim(@p0, @p1)";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING), (Object)"p1", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testLTrim1() {
        String sql = "SELECT ltrim(@p0)";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)"   a b c   "));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"a b c   "}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testLTrim2() {
        String sql = "SELECT ltrim(@p0, @p1)";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)"abxyzab"), (Object)"p1", (Object)Value.createStringValue((String)"ab"));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"xyzab"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testLTrim3() {
        String sql = "SELECT ltrim(@p0, @p1)";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING), (Object)"p1", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testRTrim1() {
        String sql = "SELECT rtrim(@p0)";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)"   a b c   "));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"   a b c"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testRTrim2() {
        String sql = "SELECT rtrim(@p0, @p1)";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)"abxyzab"), (Object)"p1", (Object)Value.createStringValue((String)"ab"));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"abxyz"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testRTrim3() {
        String sql = "SELECT rtrim(@p0, @p1)";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING), (Object)"p1", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field1", Schema.FieldType.STRING).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="https://jira.apache.org/jira/browse/BEAM-9191")
    public void testCastBytesToString1() {
        String sql = "SELECT CAST(@p0 AS STRING)";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createBytesValue((ByteString)ByteString.copyFromUtf8((String)"`")));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"`"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testCastBytesToString2() {
        String sql = "SELECT CAST(b'b' AS STRING)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"b"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="https://jira.apache.org/jira/browse/BEAM-9191")
    public void testCastBytesToStringFromTable() {
        String sql = "SELECT CAST(bytes_col AS STRING) FROM table_all_types";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"1"}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{"2"}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{"3"}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{"4"}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{"5"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testCastStringToTS() {
        String sql = "SELECT CAST('2019-01-15 13:21:03' AS TIMESTAMP)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addDateTimeField("field_1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{DateTimeUtils.parseTimestampWithUTCTimeZone((String)"2019-01-15 13:21:03")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testCastStringToString() {
        String sql = "SELECT CAST(@p0 AS STRING)";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)""));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{""}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testCastStringToInt64() {
        String sql = "SELECT CAST(@p0 AS INT64)";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)"123"));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{123L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testSelectConstant() {
        String sql = "SELECT 'hi'";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"hi"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="Does not support DATE_ADD.")
    public void testDateAddWithParameter() {
        String sql = "SELECT DATE_ADD(@p0, INTERVAL @p1 DAY), DATE_ADD(@p2, INTERVAL @p3 DAY), DATE_ADD(@p4, INTERVAL @p5 YEAR), DATE_ADD(@p6, INTERVAL @p7 DAY), DATE_ADD(@p8, INTERVAL @p9 MONTH)";
        ImmutableMap params = ImmutableMap.builder().put((Object)"p0", (Object)Value.createDateValue((int)0)).put((Object)"p1", (Object)Value.createInt64Value((long)2L)).put((Object)"p2", (Object)DateTimeUtils.parseDateToValue((String)"2019-01-01")).put((Object)"p3", (Object)Value.createInt64Value((long)2L)).put((Object)"p4", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_DATE)).put((Object)"p5", (Object)Value.createInt64Value((long)1L)).put((Object)"p6", (Object)DateTimeUtils.parseDateToValue((String)"2000-02-29")).put((Object)"p7", (Object)Value.createInt64Value((long)-365L)).put((Object)"p8", (Object)DateTimeUtils.parseDateToValue((String)"1999-03-31")).put((Object)"p9", (Object)Value.createInt64Value((long)-1L)).build();
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addDateTimeField("field1").addDateTimeField("field2").addNullableField("field3", Schema.FieldType.DATETIME).addDateTimeField("field4").addDateTimeField("field5").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{DateTimeUtils.parseDate((String)"1970-01-03"), DateTimeUtils.parseDate((String)"2019-01-03"), null, DateTimeUtils.parseDate((String)"1999-03-01"), DateTimeUtils.parseDate((String)"1999-02-28")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="Does not support TIME_ADD.")
    public void testTimeAddWithParameter() {
        String sql = "SELECT TIME_ADD(@p0, INTERVAL @p1 SECOND)";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)DateTimeUtils.parseTimeToValue((String)"12:13:14.123"), (Object)"p1", (Object)Value.createInt64Value((long)1L));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addDateTimeField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{DateTimeUtils.parseTime((String)"12:13:15.123")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimestampAddWithParameter1() {
        String sql = "SELECT TIMESTAMP_ADD(@p0, INTERVAL @p1 MILLISECOND)";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)DateTimeUtils.parseTimestampWithTZToValue((String)"2001-01-01 00:00:00+00"), (Object)"p1", (Object)Value.createInt64Value((long)1L));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addDateTimeField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{DateTimeUtils.parseTimestampWithTimeZone((String)"2001-01-01 00:00:00.001+00")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTimestampAddWithParameter2() {
        String sql = "SELECT TIMESTAMP_ADD(@p0, INTERVAL @p1 MINUTE)";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)DateTimeUtils.parseTimestampWithTZToValue((String)"2008-12-25 15:30:00+07:30"), (Object)"p1", (Object)Value.createInt64Value((long)10L));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addDateTimeField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{DateTimeUtils.parseTimestampWithTimeZone((String)"2008-12-25 15:40:00+07:30")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="[BEAM-8593] ZetaSQL does not support Map type")
    public void testSelectFromTableWithMap() {
        String sql = "SELECT row_field FROM table_with_map";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema rowSchema = Schema.builder().addInt64Field("row_id").addStringField("data").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addRowField("row_field", rowSchema).build()).addValues(new Object[]{Row.withSchema((Schema)rowSchema).addValues(new Object[]{1L, "data1"}).build()}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testSubQuery() {
        String sql = "select sum(Key) from KeyValue\ngroup by (select Key)";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage("Does not support sub-queries");
        zetaSQLQueryPlanner.convertToBeamRel(sql);
    }

    @Test
    public void testSubstr() {
        String sql = "SELECT substr(@p0, @p1, @p2)";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)"abc"), (Object)"p1", (Object)Value.createInt64Value((long)-2L), (Object)"p2", (Object)Value.createInt64Value((long)1L));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"b"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testSubstrWithLargeValueExpectException() {
        String sql = "SELECT substr(@p0, @p1, @p2)";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createStringValue((String)"abc"), (Object)"p1", (Object)Value.createInt64Value((long)0x80000000L), (Object)"p2", (Object)Value.createInt64Value((long)-2147483649L));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        this.thrown.expect(RuntimeException.class);
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testSelectAll() {
        String sql = "SELECT ALL Key, Value FROM KeyValue;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{14L, "KeyValue234"}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{15L, "KeyValue235"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testSelectDistinct() {
        String sql = "SELECT DISTINCT Key FROM aggregate_test_table;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{1L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{2L}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testSelectDistinct2() {
        String sql = "SELECT DISTINCT val.BYTES\nfrom (select b\"BYTES\" BYTES union all\n      select b\"bytes\" union all\n      select b\"ByTeS\") val";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addByteArrayField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"BYTES".getBytes(StandardCharsets.UTF_8)}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{"ByTeS".getBytes(StandardCharsets.UTF_8)}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{"bytes".getBytes(StandardCharsets.UTF_8)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testSelectBytes() {
        String sql = "SELECT b\"ByTes\"";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addByteArrayField("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"ByTes".getBytes(StandardCharsets.UTF_8)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testSelectExcept() {
        String sql = "SELECT * EXCEPT (Key, ts) FROM KeyValue;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"KeyValue234"}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{"KeyValue235"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testSelectReplace() {
        String sql = "WITH orders AS\n  (SELECT 5 as order_id,\n  \"sprocket\" as item_name,\n  200 as quantity)\nSELECT * REPLACE (\"widget\" AS item_name)\nFROM orders";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").addInt64Field("field3").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{5L, "widget", 200L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testUnionAllBasic() {
        String sql = "SELECT row_id FROM table_all_types UNION ALL SELECT row_id FROM table_all_types_2";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)1L).build(), Row.withSchema((Schema)schema).addValue((Object)2L).build(), Row.withSchema((Schema)schema).addValue((Object)3L).build(), Row.withSchema((Schema)schema).addValue((Object)4L).build(), Row.withSchema((Schema)schema).addValue((Object)5L).build(), Row.withSchema((Schema)schema).addValue((Object)6L).build(), Row.withSchema((Schema)schema).addValue((Object)7L).build(), Row.withSchema((Schema)schema).addValue((Object)8L).build(), Row.withSchema((Schema)schema).addValue((Object)9L).build(), Row.withSchema((Schema)schema).addValue((Object)10L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testAVGWithLongInput() {
        String sql = "SELECT AVG(f_int_1) FROM aggregate_test_table;";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("AVG(LONG) is not supported. You might want to use AVG(CAST(expression AS DOUBLE).");
        zetaSQLQueryPlanner.convertToBeamRel(sql);
    }

    @Test
    public void testReverseString() {
        String sql = "SELECT REVERSE('abc');";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addStringField("field2").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{"cba"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testCharLength() {
        String sql = "SELECT CHAR_LENGTH('abc');";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testCharLengthNull() {
        String sql = "SELECT CHAR_LENGTH(@p0);";
        ImmutableMap params = ImmutableMap.of((Object)"p0", (Object)Value.createSimpleNullValue((ZetaSQLType.TypeKind)ZetaSQLType.TypeKind.TYPE_STRING));
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addNullableField("field", Schema.FieldType.INT64).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testTumbleAsTVF() {
        String sql = "select Key, Value, ts, window_start, window_end from TUMBLE((select * from KeyValue), descriptor(ts), 'INTERVAL 1 SECOND')";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        ImmutableMap params = ImmutableMap.of();
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("Key").addStringField("Value").addDateTimeField("ts").addDateTimeField("window_start").addDateTimeField("window_end").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{14L, "KeyValue234", DateTimeUtils.parseTimestampWithUTCTimeZone((String)"2018-07-01 21:26:06"), DateTimeUtils.parseTimestampWithUTCTimeZone((String)"2018-07-01 21:26:06"), DateTimeUtils.parseTimestampWithUTCTimeZone((String)"2018-07-01 21:26:07")}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{15L, "KeyValue235", DateTimeUtils.parseTimestampWithUTCTimeZone((String)"2018-07-01 21:26:07"), DateTimeUtils.parseTimestampWithUTCTimeZone((String)"2018-07-01 21:26:07"), DateTimeUtils.parseTimestampWithUTCTimeZone((String)"2018-07-01T21:26:08")}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testIsNullTrueFalse() {
        String sql = "WITH Src AS (\n  SELECT NULL as data UNION ALL\n  SELECT TRUE UNION ALL\n  SELECT FALSE\n)\nSELECT\n  data IS NULL as isnull,\n  data IS NOT NULL as isnotnull,\n  data IS TRUE as istrue,\n  data IS NOT TRUE as isnottrue,\n  data IS FALSE as isfalse,\n  data IS NOT FALSE as isnotfalse\nFROM Src\n";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        ImmutableMap params = ImmutableMap.of();
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, (Map)params);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addField("isnull", Schema.FieldType.BOOLEAN).addField("isnotnull", Schema.FieldType.BOOLEAN).addField("istrue", Schema.FieldType.BOOLEAN).addField("isnottrue", Schema.FieldType.BOOLEAN).addField("isfalse", Schema.FieldType.BOOLEAN).addField("isnotfalse", Schema.FieldType.BOOLEAN).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{true, false, false, true, false, true}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{false, true, true, false, false, true}).build(), Row.withSchema((Schema)schema).addValues(new Object[]{false, true, false, true, true, false}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testZetaSQLBitOr() {
        String sql = "SELECT BIT_OR(row_id) FROM table_all_types GROUP BY bool_col";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValues(new Object[]{3L}).build(), Row.withSchema((Schema)schema).addValue((Object)7L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    @Ignore(value="NULL values don't work correctly. (https://issues.apache.org/jira/browse/BEAM-10379)")
    public void testZetaSQLBitAnd() {
        String sql = "SELECT BIT_AND(row_id) FROM table_all_types GROUP BY bool_col";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema schema = Schema.builder().addInt64Field("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)schema).addValue((Object)1L).build(), Row.withSchema((Schema)schema).addValue((Object)0L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }

    @Test
    public void testSimpleTableName() {
        String sql = "SELECT Key FROM KeyValue";
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)beamRelNode);
        Schema singleField = Schema.builder().addInt64Field("field1").build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)singleField).addValues(new Object[]{14L}).build(), Row.withSchema((Schema)singleField).addValues(new Object[]{15L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)PIPELINE_EXECUTION_WAITTIME_MINUTES));
    }
}

