package org.apache.beam.sdk.extensions.sql.zetasql;

import java.util.Arrays;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.chrono.ISOChronology;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/zetasql/StreamingSqlTest.class */
public class StreamingSqlTest extends ZetaSqlTestBase {

    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void setUp() {
        initialize();
    }

    @Test
    public void testZetaSQLBasicSlidingWindowing() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT COUNT(*) as field_count, HOP_START(\"INTERVAL 1 SECOND\", \"INTERVAL 2 SECOND\") as window_start, HOP_END(\"INTERVAL 1 SECOND\", \"INTERVAL 2 SECOND\") as window_end FROM window_test_table GROUP BY HOP(ts, \"INTERVAL 1 SECOND\", \"INTERVAL 2 SECOND\");"));
        Schema build = Schema.builder().addInt64Field("count_star").addDateTimeField("field1").addDateTimeField("field2").build();
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{2L, new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 9, ISOChronology.getInstanceUTC())}).build(), Row.withSchema(build).addValues(new Object[]{1L, new DateTime(2018, 7, 1, 21, 26, 5, ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC())}).build(), Row.withSchema(build).addValues(new Object[]{2L, new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC())}).build(), Row.withSchema(build).addValues(new Object[]{2L, new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 10, ISOChronology.getInstanceUTC())}).build(), Row.withSchema(build).addValues(new Object[]{1L, new DateTime(2018, 7, 1, 21, 26, 9, ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 11, ISOChronology.getInstanceUTC())}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLBasicSessionWindowing() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT COUNT(*) as field_count, SESSION_START(\"INTERVAL 3 SECOND\") as window_start, SESSION_END(\"INTERVAL 3 SECOND\") as window_end FROM window_test_table_two GROUP BY SESSION(ts, \"INTERVAL 3 SECOND\");"));
        Schema build = Schema.builder().addInt64Field("count_star").addDateTimeField("field1").addDateTimeField("field2").build();
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{2L, new DateTime(2018, 7, 1, 21, 26, 12, ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 12, ISOChronology.getInstanceUTC())}).build(), Row.withSchema(build).addValues(new Object[]{2L, new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC())}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testZetaSQLNestedQueryFour() {
        PAssert.that(BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT t1.Value, TUMBLE_START('INTERVAL 1 SECOND') AS period_start, MIN(t2.Value) as min_v FROM KeyValue AS t1 INNER JOIN BigTable AS t2 on t1.Key = t2.RowKey GROUP BY t1.Value, TUMBLE(t2.ts, 'INTERVAL 1 SECOND')"))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("value").addDateTimeField("min_v").addStringField("period_start").build()).addValues(new Object[]{"KeyValue235", new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()), "BigTable235"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testWithQuerySeven() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("WITH T1 AS (SELECT * FROM KeyValue) SELECT COUNT(*) as field_count, TUMBLE_START(\"INTERVAL 1 SECOND\") as window_start, TUMBLE_END(\"INTERVAL 1 SECOND\") as window_end FROM T1 GROUP BY TUMBLE(ts, \"INTERVAL 1 SECOND\");"));
        Schema build = Schema.builder().addInt64Field("count_start").addDateTimeField("field1").addDateTimeField("field2").build();
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L, new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC())}).build(), Row.withSchema(build).addValues(new Object[]{1L, new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC())}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testTVFTumbleAggregation() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT COUNT(*) as field_count, window_start FROM TUMBLE((select * from KeyValue), descriptor(ts), 'INTERVAL 1 SECOND') GROUP BY window_start"));
        Schema build = Schema.builder().addInt64Field("field_count").addDateTimeField("window_start").build();
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{1L, new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC())}).build(), Row.withSchema(build).addValues(new Object[]{1L, new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC())}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testHopAsTVFAggregation() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT COUNT(*) as field_count, window_start, window_end FROM HOP((select * from window_test_table), descriptor(ts), \"INTERVAL 1 SECOND\", \"INTERVAL 2 SECOND\") GROUP BY window_start, window_end;"));
        Schema build = Schema.builder().addInt64Field("field_count").addDateTimeField("field1").addDateTimeField("field2").build();
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{2L, new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 9, ISOChronology.getInstanceUTC())}).build(), Row.withSchema(build).addValues(new Object[]{1L, new DateTime(2018, 7, 1, 21, 26, 5, ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 7, ISOChronology.getInstanceUTC())}).build(), Row.withSchema(build).addValues(new Object[]{2L, new DateTime(2018, 7, 1, 21, 26, 6, ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC())}).build(), Row.withSchema(build).addValues(new Object[]{2L, new DateTime(2018, 7, 1, 21, 26, 8, ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 10, ISOChronology.getInstanceUTC())}).build(), Row.withSchema(build).addValues(new Object[]{1L, new DateTime(2018, 7, 1, 21, 26, 9, ISOChronology.getInstanceUTC()), new DateTime(2018, 7, 1, 21, 26, 11, ISOChronology.getInstanceUTC())}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void runTumbleWindow() throws Exception {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT f_long, COUNT(*) AS getFieldCount, window_start,  window_end  FROM TUMBLE((select * from streaming_sql_test_table_a), descriptor(f_timestamp), \"INTERVAL 1 HOUR\")  GROUP BY window_start, window_end, f_long"));
        Schema build = Schema.builder().addInt64Field("f_long").addInt64Field("size").addDateTimeField("window_start").addDateTimeField("window_end").build();
        PAssert.that(pCollection).containsInAnyOrder(Arrays.asList(Row.withSchema(build).addValues(new Object[]{1000L, 3L, DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 01:00:00"), DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 02:00:00")}).build(), Row.withSchema(build).addValues(new Object[]{4000L, 1L, DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 02:00:00"), DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 03:00:00")}).build()));
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void runTumbleWindowFor31Days() throws Exception {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT f_long, COUNT(*) AS getFieldCount, window_start,  window_end  FROM TUMBLE((select * from streaming_sql_test_table_b), descriptor(f_timestamp), \"INTERVAL 31 DAY\")  GROUP BY f_long, window_start, window_end"));
        Schema build = Schema.builder().addInt64Field("f_long").addInt64Field("size").addDateTimeField("window_start").addDateTimeField("window_end").build();
        PAssert.that(pCollection).containsInAnyOrder(Arrays.asList(Row.withSchema(build).addValues(new Object[]{1000L, 1L, DateTimeUtils.parseTimestampWithUTCTimeZone("2016-12-08 00:00:00"), DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-08 00:00:00")}).build(), Row.withSchema(build).addValues(new Object[]{2000L, 1L, DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-08 00:00:00"), DateTimeUtils.parseTimestampWithUTCTimeZone("2017-02-08 00:00:00")}).build(), Row.withSchema(build).addValues(new Object[]{3000L, 1L, DateTimeUtils.parseTimestampWithUTCTimeZone("2017-02-08 00:00:00"), DateTimeUtils.parseTimestampWithUTCTimeZone("2017-03-11 00:00:00")}).build()));
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void runHopWindow() throws Exception {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT f_long, COUNT(*) AS `getFieldCount`,  `window_start`,   `window_end`  FROM HOP((select * from streaming_sql_test_table_a), descriptor(f_timestamp),  \"INTERVAL 30 MINUTE\", \"INTERVAL 1 HOUR\") GROUP BY f_long, window_start, window_end"));
        Schema build = Schema.builder().addInt64Field("f_long").addInt64Field("size").addDateTimeField("window_start").addDateTimeField("window_end").build();
        PAssert.that(pCollection).containsInAnyOrder(Arrays.asList(Row.withSchema(build).addValues(new Object[]{1000L, 3L, DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 00:30:00"), DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 01:30:00")}).build(), Row.withSchema(build).addValues(new Object[]{1000L, 3L, DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 01:00:00"), DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 02:00:00")}).build(), Row.withSchema(build).addValues(new Object[]{4000L, 1L, DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 01:30:00"), DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 02:30:00")}).build(), Row.withSchema(build).addValues(new Object[]{4000L, 1L, DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 02:00:00"), DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 03:00:00")}).build()));
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void runSessionWindow() throws Exception {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT f_long, COUNT(*) AS `getFieldCount`, `window_start`,  `window_end`  FROM SESSION((select * from streaming_sql_test_table_a), descriptor(f_timestamp),  descriptor(f_long), \"INTERVAL 5 MINUTE\") GROUP BY f_long, window_start, window_end"));
        Schema build = Schema.builder().addInt64Field("f_long").addInt64Field("size").addDateTimeField("window_start").addDateTimeField("window_end").build();
        PAssert.that(pCollection).containsInAnyOrder(Arrays.asList(Row.withSchema(build).addValues(new Object[]{1000L, 3L, DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 01:01:03"), DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 01:11:03")}).build(), Row.withSchema(build).addValues(new Object[]{4000L, 1L, DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 02:04:03"), DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 02:09:03")}).build()));
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void runSessionWindow2() throws Exception {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT f_long, f_string, COUNT(*) AS `getFieldCount`, `window_start`, `window_end`  FROM SESSION((select * from streaming_sql_test_table_a), descriptor(f_timestamp),  descriptor(f_long, f_string), \"INTERVAL 5 MINUTE\") GROUP BY f_long, f_string, window_start, window_end"));
        Schema build = Schema.builder().addInt64Field("f_long").addStringField("f_string").addInt64Field("size").addDateTimeField("window_start").addDateTimeField("window_end").build();
        PAssert.that(pCollection).containsInAnyOrder(Arrays.asList(Row.withSchema(build).addValues(new Object[]{1000L, "string_row1", 2L, DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 01:01:03"), DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 01:07:03")}).build(), Row.withSchema(build).addValues(new Object[]{1000L, "string_row3", 1L, DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 01:06:03"), DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 01:11:03")}).build(), Row.withSchema(build).addValues(new Object[]{4000L, "第四行", 1L, DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 02:04:03"), DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-01 02:09:03")}).build()));
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    @Ignore("[https://github.com/apache/beam/issues/20101] CAST operator does not work fully due to bugs in unparsing")
    public void testZetaSQLStructFieldAccessInTumble() {
        PAssert.that(BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT TUMBLE_START('INTERVAL 1 MINUTE') FROM table_with_struct_ts_string AS A GROUP BY TUMBLE(CAST(A.struct_col.struct_col_str AS TIMESTAMP), 'INTERVAL 1 MINUTE')"))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addDateTimeField("field").build()).addValue(DateTimeUtils.parseTimestampWithUTCTimeZone("2019-01-15 13:21:00")).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }
}
