/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_28_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.calcite.v1_28_0.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

public class BeamComplexTypeTest {
    private static final Schema innerRowSchema = Schema.builder().addStringField("string_field").addInt64Field("long_field").build();
    private static final Schema innerRowWithArraySchema = Schema.builder().addStringField("string_field").addArrayField("array_field", Schema.FieldType.INT64).build();
    private static final Schema nullableInnerRowSchema = Schema.builder().addNullableField("inner_row_field", Schema.FieldType.row((Schema)innerRowSchema)).addNullableField("array_field", Schema.FieldType.array((Schema.FieldType)Schema.FieldType.row((Schema)innerRowSchema))).build();
    private static final Schema nullableNestedRowWithArraySchema = Schema.builder().addNullableField("field1", Schema.FieldType.row((Schema)innerRowWithArraySchema)).addNullableField("field2", Schema.FieldType.array((Schema.FieldType)Schema.FieldType.row((Schema)innerRowWithArraySchema))).addNullableField("field3", Schema.FieldType.row((Schema)nullableInnerRowSchema)).build();
    private static final Schema nestedRowSchema = Schema.builder().addStringField("nonRowfield1").addRowField("RowField", innerRowSchema).addInt64Field("nonRowfield2").addRowField("RowFieldTwo", innerRowSchema).build();
    private static final Schema rowWithArraySchema = Schema.builder().addStringField("field1").addInt64Field("field2").addArrayField("field3", Schema.FieldType.INT64).build();
    private static final ReadOnlyTableProvider readOnlyTableProvider = new ReadOnlyTableProvider("test_provider", (Map)ImmutableMap.of((Object)"arrayWithRowTestTable", (Object)TestBoundedTable.of((Object[])new Object[]{Schema.FieldType.array((Schema.FieldType)Schema.FieldType.row((Schema)innerRowSchema)), "col"}).addRows(new Object[]{Arrays.asList(Row.withSchema((Schema)innerRowSchema).addValues(new Object[]{"str", 1L}).build())}), (Object)"nestedArrayTestTable", (Object)TestBoundedTable.of((Object[])new Object[]{Schema.FieldType.array((Schema.FieldType)Schema.FieldType.array((Schema.FieldType)Schema.FieldType.INT64)), "col"}).addRows(new Object[]{Arrays.asList(Arrays.asList(1L, 2L, 3L), Arrays.asList(4L, 5L))}), (Object)"nestedRowTestTable", (Object)TestBoundedTable.of((Object[])new Object[]{Schema.FieldType.row((Schema)nestedRowSchema), "col"}).addRows(new Object[]{Row.withSchema((Schema)nestedRowSchema).addValues(new Object[]{"str", Row.withSchema((Schema)innerRowSchema).addValues(new Object[]{"inner_str_one", 1L}).build(), 2L, Row.withSchema((Schema)innerRowSchema).addValues(new Object[]{"inner_str_two", 3L}).build()}).build()}), (Object)"basicRowTestTable", (Object)TestBoundedTable.of((Object[])new Object[]{Schema.FieldType.row((Schema)innerRowSchema), "col"}).addRows(new Object[]{Row.withSchema((Schema)innerRowSchema).addValues(new Object[]{"innerStr", 1L}).build()}), (Object)"rowWithArrayTestTable", (Object)TestBoundedTable.of((Object[])new Object[]{Schema.FieldType.row((Schema)rowWithArraySchema), "col"}).addRows(new Object[]{Row.withSchema((Schema)rowWithArraySchema).addValues(new Object[]{"str", 4L, Arrays.asList(5L, 6L)}).build()})));
    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();

    @Test
    public void testNestedRow() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{readOnlyTableProvider});
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery("SELECT nestedRowTestTable.col FROM nestedRowTestTable"));
        Schema outputSchema = Schema.builder().addRowField("col", nestedRowSchema).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)outputSchema).addValues(new Object[]{Row.withSchema((Schema)nestedRowSchema).addValues(new Object[]{"str", Row.withSchema((Schema)innerRowSchema).addValues(new Object[]{"inner_str_one", 1L}).build(), 2L, Row.withSchema((Schema)innerRowSchema).addValues(new Object[]{"inner_str_two", 3L}).build()}).build()}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)2L));
    }

    @Test
    public void testArrayWithRow() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{readOnlyTableProvider});
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery("SELECT arrayWithRowTestTable.col[1] FROM arrayWithRowTestTable"));
        Schema outputSchema = Schema.builder().addRowField("col", innerRowSchema).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)outputSchema).addValues(new Object[]{Row.withSchema((Schema)innerRowSchema).addValues(new Object[]{"str", 1L}).build()}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)2L));
    }

    @Test
    public void testNestedArray() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{readOnlyTableProvider});
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery("SELECT nestedArrayTestTable.col[1][3], nestedArrayTestTable.col[2][1] FROM nestedArrayTestTable"));
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("field1").addInt64Field("field2").build()).addValues(new Object[]{3L, 4L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)2L));
    }

    @Test
    public void testBasicRow() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{readOnlyTableProvider});
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery("SELECT col FROM basicRowTestTable"));
        Schema outputSchema = Schema.builder().addRowField("col", innerRowSchema).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)outputSchema).addValues(new Object[]{Row.withSchema((Schema)innerRowSchema).addValues(new Object[]{"innerStr", 1L}).build()}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)2L));
    }

    @Test
    public void testArrayConstructor() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{readOnlyTableProvider});
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery("SELECT ARRAY[1, 2, 3] f_arr"));
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addArrayField("f_arr", Schema.FieldType.INT32).build()).addValue(Arrays.asList(1, 2, 3)).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)2L));
    }

    @Test
    public void testRowWithArray() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{readOnlyTableProvider});
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery("SELECT rowWithArrayTestTable.col.field3[2] FROM rowWithArrayTestTable"));
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addInt64Field("int64").build()).addValue((Object)6L).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)2L));
    }

    @Test
    public void testFieldAccessToNestedRow() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{readOnlyTableProvider});
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery("SELECT nestedRowTestTable.col.RowField.string_field, nestedRowTestTable.col.RowFieldTwo.long_field FROM nestedRowTestTable"));
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addStringField("field1").addInt64Field("field2").build()).addValues(new Object[]{"inner_str_one", 3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)2L));
    }

    @Ignore(value="https://issues.apache.org/jira/browse/BEAM-5189")
    @Test
    public void testSelectInnerRowOfNestedRow() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{readOnlyTableProvider});
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery("SELECT nestedRowTestTable.col.RowField FROM nestedRowTestTable"));
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addStringField("field1").addInt64Field("field2").build()).addValues(new Object[]{"inner_str_one", 1L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)2L));
    }

    @Ignore(value="https://issues.apache.org/jira/browse/BEAM-12782")
    @Test
    public void testNestedBytes() {
        byte[] bytes = new byte[]{-70, -83, -54, -2};
        Schema nestedInputSchema = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"c_bytes", (Schema.FieldType)Schema.FieldType.BYTES)});
        Schema inputSchema = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"nested", (Schema.FieldType)Schema.FieldType.row((Schema)nestedInputSchema))});
        Schema outputSchema = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"f0", (Schema.FieldType)Schema.FieldType.BYTES)});
        Row nestedRow = Row.withSchema((Schema)nestedInputSchema).addValue((Object)bytes).build();
        Row row = Row.withSchema((Schema)inputSchema).addValue((Object)nestedRow).build();
        Row expected = Row.withSchema((Schema)outputSchema).addValue((Object)bytes).build();
        PCollection result = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)row, (Object[])new Row[0]).withRowSchema(inputSchema))).apply((PTransform)SqlTransform.query((String)"SELECT t.nested.c_bytes AS f0 FROM PCOLLECTION t"));
        PAssert.that((PCollection)result).containsInAnyOrder((Object[])new Row[]{expected});
        this.pipeline.run();
    }

    @Ignore(value="https://issues.apache.org/jira/browse/BEAM-12782")
    @Test
    public void testNestedArrayOfBytes() {
        byte[] bytes = new byte[]{-70, -83, -54, -2};
        Schema nestedInputSchema = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"c_bytes", (Schema.FieldType)Schema.FieldType.array((Schema.FieldType)Schema.FieldType.BYTES))});
        Schema inputSchema = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"nested", (Schema.FieldType)Schema.FieldType.row((Schema)nestedInputSchema))});
        Schema outputSchema = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"f0", (Schema.FieldType)Schema.FieldType.BYTES)});
        Row nestedRow = Row.withSchema((Schema)nestedInputSchema).addValue((Object)ImmutableList.of((Object)bytes)).build();
        Row row = Row.withSchema((Schema)inputSchema).addValue((Object)nestedRow).build();
        Row expected = Row.withSchema((Schema)outputSchema).addValue((Object)bytes).build();
        PCollection result = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)row, (Object[])new Row[0]).withRowSchema(inputSchema))).apply((PTransform)SqlTransform.query((String)"SELECT t.nested.c_bytes[1] AS f0 FROM PCOLLECTION t"));
        PAssert.that((PCollection)result).containsInAnyOrder((Object[])new Row[]{expected});
        this.pipeline.run();
    }

    @Test
    public void testRowConstructor() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{readOnlyTableProvider});
        PCollection stream = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery("SELECT ROW(1, ROW(2, 3), 'str', ROW('str2', 'str3'))"));
        Schema intRow = Schema.builder().addInt32Field("field2").addInt32Field("field3").build();
        Schema strRow = Schema.builder().addStringField("field5").addStringField("field6").build();
        Schema innerRow = Schema.builder().addInt32Field("field1").addRowField("intRow", intRow).addStringField("field4").addRowField("strRow", strRow).build();
        PAssert.that((PCollection)stream).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addRowField("row", innerRow).build()).addValues(new Object[]{Row.withSchema((Schema)innerRow).addValues(new Object[]{1, Row.withSchema((Schema)intRow).addValues(new Object[]{2, 3}).build(), "str", Row.withSchema((Schema)strRow).addValues(new Object[]{"str2", "str3"}).build()}).build()}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)2L));
    }

    @Test
    public void testNullRows() {
        Row nullRow = Row.nullRow((Schema)nullableNestedRowWithArraySchema);
        PCollection outputRow = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)nullRow, (Object[])new Row[0]))).setRowSchema(nullableNestedRowWithArraySchema).apply((PTransform)SqlTransform.query((String)"select PCOLLECTION.field1.string_field as row_string_field, PCOLLECTION.field2[2].string_field as array_string_field from PCOLLECTION"));
        PAssert.that((PCollection)outputRow).containsInAnyOrder((Object[])new Row[]{Row.nullRow((Schema)Schema.builder().addNullableField("row_string_field", Schema.FieldType.STRING).addNullableField("array_string_field", Schema.FieldType.STRING).build())});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)2L));
    }

    @Test
    public void testNullInnerRow() {
        Row nestedInnerRow = Row.withSchema((Schema)innerRowSchema).addValues(new Object[]{"str", 1000L}).build();
        Row innerRow = Row.withSchema((Schema)nullableInnerRowSchema).addValues(new Object[]{null, Arrays.asList(nestedInnerRow)}).build();
        Row row = Row.withSchema((Schema)nullableNestedRowWithArraySchema).addValues(new Object[]{null, null, innerRow}).build();
        PCollection outputRow = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)row, (Object[])new Row[0]))).setRowSchema(nullableNestedRowWithArraySchema).apply((PTransform)SqlTransform.query((String)"select PCOLLECTION.field3.inner_row_field.string_field as string_field, PCOLLECTION.field3.array_field[1].long_field as long_field from PCOLLECTION"));
        PAssert.that((PCollection)outputRow).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)Schema.builder().addNullableField("string_field", Schema.FieldType.STRING).addInt64Field("long_field").build()).addValues(new Object[]{null, 1000L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)2L));
    }

    @Test
    public void testDatetimeFields() {
        Instant current = new Instant(1561671380000L);
        Schema dateTimeFieldSchema = Schema.builder().addField("dateTimeField", Schema.FieldType.DATETIME).addNullableField("nullableDateTimeField", Schema.FieldType.DATETIME).build();
        Row dateTimeRow = Row.withSchema((Schema)dateTimeFieldSchema).addValues(new Object[]{current, null}).build();
        PCollection outputRow = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)dateTimeRow, (Object[])new Row[0]))).setRowSchema(dateTimeFieldSchema).apply((PTransform)SqlTransform.query((String)"select  dateTimeField,  nullableDateTimeField,  EXTRACT(YEAR from dateTimeField) as yyyy,  EXTRACT(YEAR from nullableDateTimeField) as year_with_null,  EXTRACT(MONTH from dateTimeField) as mm,  EXTRACT(MONTH from nullableDateTimeField) as month_with_null  from PCOLLECTION"));
        Schema outputRowSchema = Schema.builder().addField("dateTimeField", Schema.FieldType.DATETIME).addNullableField("nullableDateTimeField", Schema.FieldType.DATETIME).addField("yyyy", Schema.FieldType.INT64).addNullableField("year_with_null", Schema.FieldType.INT64).addField("mm", Schema.FieldType.INT64).addNullableField("month_with_null", Schema.FieldType.INT64).build();
        PAssert.that((PCollection)outputRow).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)outputRowSchema).addValues(new Object[]{current, null, 2019L, null, 6L, null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)2L));
    }

    @Test
    public void testSqlLogicalTypeDateFields() {
        Schema dateTimeFieldSchema = Schema.builder().addField("dateTypeField", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.DATE)).addNullableField("nullableDateTypeField", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.DATE)).build();
        Row dateRow = Row.withSchema((Schema)dateTimeFieldSchema).addValues(new Object[]{LocalDate.of(2019, 6, 27), null}).build();
        PCollection outputRow = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)dateRow, (Object[])new Row[0]))).setRowSchema(dateTimeFieldSchema).apply((PTransform)SqlTransform.query((String)"select  dateTypeField,  nullableDateTypeField,  EXTRACT(DAY from dateTypeField) as dd,  EXTRACT(DAY from nullableDateTypeField) as day_with_null,  dateTypeField + interval '1' day as date_with_day_added,  nullableDateTypeField + interval '1' day as day_added_with_null  from PCOLLECTION"));
        Schema outputRowSchema = Schema.builder().addField("dateTypeField", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.DATE)).addNullableField("nullableDateTypeField", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.DATE)).addField("dd", Schema.FieldType.INT64).addNullableField("day_with_null", Schema.FieldType.INT64).addField("date_with_day_added", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.DATE)).addNullableField("day_added_with_null", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.DATE)).build();
        PAssert.that((PCollection)outputRow).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)outputRowSchema).addValues(new Object[]{LocalDate.of(2019, 6, 27), null, 27L, null, LocalDate.of(2019, 6, 28), null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)2L));
    }

    @Test
    public void testSqlLogicalTypeTimeFields() {
        Schema dateTimeFieldSchema = Schema.builder().addField("timeTypeField", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.TIME)).addNullableField("nullableTimeTypeField", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.TIME)).build();
        Row timeRow = Row.withSchema((Schema)dateTimeFieldSchema).addValues(new Object[]{LocalTime.of(1, 0, 0), null}).build();
        PCollection outputRow = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)timeRow, (Object[])new Row[0]))).setRowSchema(dateTimeFieldSchema).apply((PTransform)SqlTransform.query((String)"select  timeTypeField,  nullableTimeTypeField,  timeTypeField + interval '1' hour as time_with_hour_added,  nullableTimeTypeField + interval '1' hour as hour_added_with_null,  timeTypeField - INTERVAL '60' SECOND as time_with_seconds_added,  nullableTimeTypeField - INTERVAL '60' SECOND as seconds_added_with_null  from PCOLLECTION"));
        Schema outputRowSchema = Schema.builder().addField("timeTypeField", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.TIME)).addNullableField("nullableTimeTypeField", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.TIME)).addField("time_with_hour_added", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.TIME)).addNullableField("hour_added_with_null", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.TIME)).addField("time_with_seconds_added", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.TIME)).addNullableField("seconds_added_with_null", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.TIME)).build();
        PAssert.that((PCollection)outputRow).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)outputRowSchema).addValues(new Object[]{LocalTime.of(1, 0, 0), null, LocalTime.of(2, 0, 0), null, LocalTime.of(0, 59, 0), null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)2L));
    }

    @Test
    public void testSqlLogicalTypeDatetimeFields() {
        Schema dateTimeFieldSchema = Schema.builder().addField("dateTimeField", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.DATETIME)).addNullableField("nullableDateTimeField", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.DATETIME)).build();
        Row dateTimeRow = Row.withSchema((Schema)dateTimeFieldSchema).addValues(new Object[]{LocalDateTime.of(2008, 12, 25, 15, 30, 0), null}).build();
        PCollection outputRow = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)dateTimeRow, (Object[])new Row[0]))).setRowSchema(dateTimeFieldSchema).apply((PTransform)SqlTransform.query((String)"select  dateTimeField,  nullableDateTimeField,  EXTRACT(YEAR from dateTimeField) as yyyy,  EXTRACT(YEAR from nullableDateTimeField) as year_with_null,  EXTRACT(MONTH from dateTimeField) as mm,  EXTRACT(MONTH from nullableDateTimeField) as month_with_null,  dateTimeField + interval '1' hour as time_with_hour_added,  nullableDateTimeField + interval '1' hour as hour_added_with_null,  dateTimeField - INTERVAL '60' SECOND as time_with_seconds_added,  nullableDateTimeField - INTERVAL '60' SECOND as seconds_added_with_null,  EXTRACT(DAY from dateTimeField) as dd,  EXTRACT(DAY from nullableDateTimeField) as day_with_null,  dateTimeField + interval '1' day as date_with_day_added,  nullableDateTimeField + interval '1' day as day_added_with_null  from PCOLLECTION"));
        Schema outputRowSchema = Schema.builder().addField("dateTimeField", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.DATETIME)).addNullableField("nullableDateTimeField", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.DATETIME)).addField("yyyy", Schema.FieldType.INT64).addNullableField("year_with_null", Schema.FieldType.INT64).addField("mm", Schema.FieldType.INT64).addNullableField("month_with_null", Schema.FieldType.INT64).addField("time_with_hour_added", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.DATETIME)).addNullableField("hour_added_with_null", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.DATETIME)).addField("time_with_seconds_added", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.DATETIME)).addNullableField("seconds_added_with_null", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.DATETIME)).addField("dd", Schema.FieldType.INT64).addNullableField("day_with_null", Schema.FieldType.INT64).addField("date_with_day_added", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.DATETIME)).addNullableField("day_added_with_null", Schema.FieldType.logicalType((Schema.LogicalType)SqlTypes.DATETIME)).build();
        PAssert.that((PCollection)outputRow).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)outputRowSchema).addValues(new Object[]{LocalDateTime.of(2008, 12, 25, 15, 30, 0), null, 2008L, null, 12L, null, LocalDateTime.of(2008, 12, 25, 16, 30, 0), null, LocalDateTime.of(2008, 12, 25, 15, 29, 0), null, 25L, null, LocalDateTime.of(2008, 12, 26, 15, 30, 0), null}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)2L));
    }

    @Test
    public void testMapWithRowAsValue() {
        Schema inputSchema = Schema.builder().addMapField("mapWithValueAsRow", Schema.FieldType.STRING, Schema.FieldType.row((Schema)rowWithArraySchema)).build();
        HashMap<String, Row> mapWithValueAsRow = new HashMap<String, Row>();
        Row complexRow = Row.withSchema((Schema)rowWithArraySchema).addValues(new Object[]{"RED", 5L, Arrays.asList(10L, 20L, 30L)}).build();
        mapWithValueAsRow.put("key", complexRow);
        Row rowOfMap = Row.withSchema((Schema)inputSchema).addValue(mapWithValueAsRow).build();
        PCollection outputRow = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)rowOfMap, (Object[])new Row[0]))).setRowSchema(inputSchema).apply((PTransform)SqlTransform.query((String)"select  PCOLLECTION.mapWithValueAsRow['key'].field1 as color, PCOLLECTION.mapWithValueAsRow['key'].field3[2]  as num   from PCOLLECTION"));
        Row expectedRow = Row.withSchema((Schema)Schema.builder().addStringField("color").addInt64Field("num").build()).addValues(new Object[]{"RED", 20L}).build();
        PAssert.that((PCollection)outputRow).containsInAnyOrder((Object[])new Row[]{expectedRow});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)1L));
    }

    @Test
    public void testMapWithNullRowFields() {
        Schema nullableInnerSchema = Schema.builder().addNullableField("strField", Schema.FieldType.STRING).addNullableField("arrField", Schema.FieldType.array((Schema.FieldType)Schema.FieldType.INT64)).build();
        Schema inputSchema = Schema.builder().addMapField("mapField", Schema.FieldType.STRING, Schema.FieldType.row((Schema)nullableInnerSchema)).addNullableField("nullableMapField", Schema.FieldType.map((Schema.FieldType)Schema.FieldType.STRING, (Schema.FieldType)Schema.FieldType.row((Schema)nullableInnerSchema))).build();
        Row mapValue = Row.withSchema((Schema)nullableInnerSchema).addValues(new Object[]{"str", null}).build();
        HashMap<String, Row> mapWithValueAsRow = new HashMap<String, Row>();
        mapWithValueAsRow.put("key", mapValue);
        Row inputRow = Row.withSchema((Schema)inputSchema).addValues(new Object[]{mapWithValueAsRow, null}).build();
        PCollection outputRow = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)inputRow, (Object[])new Row[0]))).setRowSchema(inputSchema).apply((PTransform)SqlTransform.query((String)"select PCOLLECTION.mapField['key'].strField as str, PCOLLECTION.mapField['key'].arrField[1] as arr, PCOLLECTION.nullableMapField['key'].arrField[1] as nullableField  from PCOLLECTION"));
        Row expectedRow = Row.withSchema((Schema)Schema.builder().addStringField("str").addNullableField("arr", Schema.FieldType.INT64).addNullableField("nullableField", Schema.FieldType.INT64).build()).addValues(new Object[]{"str", null, null}).build();
        PAssert.that((PCollection)outputRow).containsInAnyOrder((Object[])new Row[]{expectedRow});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)1L));
    }
}

