/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql;

import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.BeamSqlDslBase;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.impl.ParseException;
import org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.internal.matchers.ThrowableMessageMatcher;

public class BeamSqlDslAggregationTest
extends BeamSqlDslBase {
    public PCollection<Row> boundedInput3;

    @Before
    public void setUp() {
        Schema schemaInTableB = Schema.builder().addInt32Field("f_int").addDoubleField("f_double").addInt32Field("f_int2").addDecimalField("f_decimal").build();
        List<Row> rowsInTableB = TestUtils.RowsBuilder.of(schemaInTableB).addRows(1, 1.0, 0, new BigDecimal(1), 4, 4.0, 0, new BigDecimal(4), 7, 7.0, 0, new BigDecimal(7), 13, 13.0, 0, new BigDecimal(13), 5, 5.0, 0, new BigDecimal(5), 10, 10.0, 0, new BigDecimal(10), 17, 17.0, 0, new BigDecimal(17)).getRows();
        this.boundedInput3 = (PCollection)this.pipeline.apply("boundedInput3", (PTransform)Create.of(rowsInTableB).withSchema(schemaInTableB, SerializableFunctions.identity(), SerializableFunctions.identity()));
    }

    @Test
    public void testAggregationWithoutWindowWithBounded() throws Exception {
        this.runAggregationWithoutWindow((PCollection<Row>)this.boundedInput1);
    }

    @Test
    public void testAggregationWithoutWindowWithUnbounded() throws Exception {
        this.runAggregationWithoutWindow((PCollection<Row>)this.unboundedInput1);
    }

    private void runAggregationWithoutWindow(PCollection<Row> input) throws Exception {
        String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount` FROM PCOLLECTION GROUP BY f_int2";
        PCollection result = (PCollection)input.apply("testAggregationWithoutWindow", (PTransform)SqlTransform.query((String)sql));
        Schema resultType = Schema.builder().addInt32Field("f_int2").addInt64Field("size").build();
        Row row = Row.withSchema((Schema)resultType).addValues(new Object[]{0, 4L}).build();
        PAssert.that((PCollection)result).containsInAnyOrder((Object[])new Row[]{row});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testAggregationFunctionsWithBounded() throws Exception {
        this.runAggregationFunctions((PCollection<Row>)this.boundedInput1);
    }

    @Test
    public void testAggregationFunctionsWithUnbounded() throws Exception {
        this.runAggregationFunctions((PCollection<Row>)this.unboundedInput1);
    }

    private void runAggregationFunctions(PCollection<Row> input) throws Exception {
        String sql = "select f_int2, count(*) as getFieldCount, sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as min1, sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short) as min2, sum(f_byte) as sum3, avg(f_byte) as avg3, max(f_byte) as max3, min(f_byte) as min3, sum(f_float) as sum4, avg(f_float) as avg4, max(f_float) as max4, min(f_float) as min4, sum(f_double) as sum5, avg(f_double) as avg5, max(f_double) as max5, min(f_double) as min5, max(f_timestamp) as max6, min(f_timestamp) as min6, max(f_string) as max7, min(f_string) as min7, var_pop(f_double) as varpop1, var_samp(f_double) as varsamp1, var_pop(f_int) as varpop2, var_samp(f_int) as varsamp2 FROM TABLE_A group by f_int2";
        PCollection result = (PCollection)PCollectionTuple.of((TupleTag)new TupleTag("TABLE_A"), input).apply("testAggregationFunctions", (PTransform)SqlTransform.query((String)sql));
        Schema resultType = Schema.builder().addInt32Field("f_int2").addInt64Field("size").addInt64Field("sum1").addInt64Field("avg1").addInt64Field("max1").addInt64Field("min1").addInt16Field("sum2").addInt16Field("avg2").addInt16Field("max2").addInt16Field("min2").addByteField("sum3").addByteField("avg3").addByteField("max3").addByteField("min3").addFloatField("sum4").addFloatField("avg4").addFloatField("max4").addFloatField("min4").addDoubleField("sum5").addDoubleField("avg5").addDoubleField("max5").addDoubleField("min5").addDateTimeField("max6").addDateTimeField("min6").addStringField("max7").addStringField("min7").addDoubleField("varpop1").addDoubleField("varsamp1").addInt32Field("varpop2").addInt32Field("varsamp2").build();
        Row row = Row.withSchema((Schema)resultType).addValues(new Object[]{0, 4L, 10000L, 2500L, 4000L, 1000L, (short)10, (short)2, (short)4, (short)1, (byte)10, (byte)2, (byte)4, (byte)1, Float.valueOf(10.0f), Float.valueOf(2.5f), Float.valueOf(4.0f), Float.valueOf(1.0f), 10.0, 2.5, 4.0, 1.0, DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 02:04:03"), DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 01:01:03"), "\u7b2c\u56db\u884c", "string_row1", 1.25, 1.666666667, 1, 1}).build();
        PAssert.that((PCollection)result).containsInAnyOrder((Object[])new Row[]{row});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testAggregationFunctionsWithBoundedOnBigDecimalDivide() throws Exception {
        String sql = "SELECT AVG(f_double) as avg1, AVG(f_int) as avg2, VAR_POP(f_double) as varpop1, VAR_POP(f_int) as varpop2, VAR_SAMP(f_double) as varsamp1, VAR_SAMP(f_int) as varsamp2 FROM PCOLLECTION GROUP BY f_int2";
        PCollection result = (PCollection)this.boundedInput3.apply("testAggregationWithDecimalValue", (PTransform)SqlTransform.query((String)sql));
        PAssert.that((PCollection)result).satisfies((SerializableFunction)new CheckerBigDecimalDivide());
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testDistinctWithBounded() throws Exception {
        this.runDistinct((PCollection<Row>)this.boundedInput1);
    }

    @Test
    public void testDistinctWithUnbounded() throws Exception {
        this.runDistinct((PCollection<Row>)this.unboundedInput1);
    }

    private void runDistinct(PCollection<Row> input) throws Exception {
        String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION ";
        PCollection result = (PCollection)input.apply("testDistinct", (PTransform)SqlTransform.query((String)sql));
        Schema resultType = Schema.builder().addInt32Field("f_int").addInt64Field("f_long").build();
        List<Row> expectedRows = TestUtils.RowsBuilder.of(resultType).addRows(1, 1000L, 2, 2000L, 3, 3000L, 4, 4000L).getRows();
        PAssert.that((PCollection)result).containsInAnyOrder(expectedRows);
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testTumbleWindowWithBounded() throws Exception {
        this.runTumbleWindow((PCollection<Row>)this.boundedInput1);
    }

    @Test
    public void testTumbleWindowWithUnbounded() throws Exception {
        this.runTumbleWindow((PCollection<Row>)this.unboundedInput1);
    }

    @Test
    public void testTumbleWindowWith31DaysBounded() throws Exception {
        this.runTumbleWindowFor31Days((PCollection<Row>)this.boundedInputMonthly);
    }

    private void runTumbleWindowFor31Days(PCollection<Row> input) throws Exception {
        String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`, TUMBLE_START(f_timestamp, INTERVAL '31' DAY) AS `window_start`,  TUMBLE_END(f_timestamp, INTERVAL '31' DAY) AS `window_end`  FROM TABLE_A GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '31' DAY)";
        PCollection result = (PCollection)PCollectionTuple.of((TupleTag)new TupleTag("TABLE_A"), input).apply("testTumbleWindow", (PTransform)SqlTransform.query((String)sql));
        Schema resultType = Schema.builder().addInt32Field("f_int2").addInt64Field("size").addDateTimeField("window_start").addDateTimeField("window_end").build();
        List<Row> expectedRows = TestUtils.RowsBuilder.of(resultType).addRows(0, 1L, DateTimeUtils.parseTimestampWithUTCTimeZone("2016-12-08 00:00:00"), DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-08 00:00:00"), 0, 1L, DateTimeUtils.parseTimestampWithUTCTimeZone("2017-01-08 00:00:00"), DateTimeUtils.parseTimestampWithUTCTimeZone("2017-02-08 00:00:00"), 0, 1L, DateTimeUtils.parseTimestampWithUTCTimeZone("2017-02-08 00:00:00"), DateTimeUtils.parseTimestampWithUTCTimeZone("2017-03-11 00:00:00")).getRows();
        PAssert.that((PCollection)result).containsInAnyOrder(expectedRows);
        this.pipeline.run().waitUntilFinish();
    }

    private void runTumbleWindow(PCollection<Row> input) throws Exception {
        String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`, TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`,  TUMBLE_END(f_timestamp, INTERVAL '1' HOUR) AS `window_end`  FROM TABLE_A GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)";
        PCollection result = (PCollection)PCollectionTuple.of((TupleTag)new TupleTag("TABLE_A"), input).apply("testTumbleWindow", (PTransform)SqlTransform.query((String)sql));
        Schema resultType = Schema.builder().addInt32Field("f_int2").addInt64Field("size").addDateTimeField("window_start").addDateTimeField("window_end").build();
        List<Row> expectedRows = TestUtils.RowsBuilder.of(resultType).addRows(0, 3L, DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 01:00:00"), DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 02:00:00"), 0, 1L, DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 02:00:00"), DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 03:00:00")).getRows();
        PAssert.that((PCollection)result).containsInAnyOrder(expectedRows);
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    @Category(value={UsesTestStream.class})
    public void testTriggeredTumble() throws Exception {
        Schema inputSchema = Schema.builder().addInt32Field("f_int").addDateTimeField("f_timestamp").build();
        PCollection input = (PCollection)this.pipeline.apply((PTransform)TestStream.create((Schema)inputSchema, (SerializableFunction)SerializableFunctions.identity(), (SerializableFunction)SerializableFunctions.identity()).addElements((Object)Row.withSchema((Schema)inputSchema).addValues(new Object[]{1, DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 01:01:01")}).build(), (Object[])new Row[]{Row.withSchema((Schema)inputSchema).addValues(new Object[]{2, DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 01:01:01")}).build()}).addElements((Object)Row.withSchema((Schema)inputSchema).addValues(new Object[]{3, DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 01:01:01")}).build(), (Object[])new Row[0]).addElements((Object)Row.withSchema((Schema)inputSchema).addValues(new Object[]{4, DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 01:01:01")}).build(), (Object[])new Row[0]).advanceWatermarkToInfinity());
        String sql = "SELECT SUM(f_int) AS f_int_sum FROM PCOLLECTION GROUP BY TUMBLE(f_timestamp, INTERVAL '1' HOUR)";
        Schema outputSchema = Schema.builder().addInt32Field("fn_int_sum").build();
        PCollection result = (PCollection)((PCollection)input.apply("Triggering", (PTransform)Window.configure().triggering((Trigger)Repeatedly.forever((Trigger)AfterPane.elementCountAtLeast((int)1))).withAllowedLateness(Duration.ZERO).withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY).accumulatingFiredPanes())).apply("Windowed Query", (PTransform)SqlTransform.query((String)sql));
        PAssert.that((PCollection)result).containsInAnyOrder(TestUtils.RowsBuilder.of(outputSchema).addRows(3).addRows(6).addRows(10).getRows());
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testHopWindowWithBounded() throws Exception {
        this.runHopWindow((PCollection<Row>)this.boundedInput1);
    }

    @Test
    public void testHopWindowWithUnbounded() throws Exception {
        this.runHopWindow((PCollection<Row>)this.unboundedInput1);
    }

    private void runHopWindow(PCollection<Row> input) throws Exception {
        String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`, HOP_START(f_timestamp, INTERVAL '30' MINUTE, INTERVAL '1' HOUR) AS `window_start`,  HOP_END(f_timestamp, INTERVAL '30' MINUTE, INTERVAL '1' HOUR) AS `window_end`  FROM PCOLLECTION GROUP BY f_int2, HOP(f_timestamp, INTERVAL '30' MINUTE, INTERVAL '1' HOUR)";
        PCollection result = (PCollection)input.apply("testHopWindow", (PTransform)SqlTransform.query((String)sql));
        Schema resultType = Schema.builder().addInt32Field("f_int2").addInt64Field("size").addDateTimeField("window_start").addDateTimeField("window_end").build();
        List<Row> expectedRows = TestUtils.RowsBuilder.of(resultType).addRows(0, 3L, DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 00:30:00"), DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 01:30:00"), 0, 3L, DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 01:00:00"), DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 02:00:00"), 0, 1L, DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 01:30:00"), DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 02:30:00"), 0, 1L, DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 02:00:00"), DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 03:00:00")).getRows();
        PAssert.that((PCollection)result).containsInAnyOrder(expectedRows);
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testSessionWindowWithBounded() throws Exception {
        this.runSessionWindow((PCollection<Row>)this.boundedInput1);
    }

    @Test
    public void testSessionWindowWithUnbounded() throws Exception {
        this.runSessionWindow((PCollection<Row>)this.unboundedInput1);
    }

    private void runSessionWindow(PCollection<Row> input) throws Exception {
        String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`, SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`,  SESSION_END(f_timestamp, INTERVAL '5' MINUTE) AS `window_end`  FROM TABLE_A GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";
        PCollection result = (PCollection)PCollectionTuple.of((TupleTag)new TupleTag("TABLE_A"), input).apply("testSessionWindow", (PTransform)SqlTransform.query((String)sql));
        Schema resultType = Schema.builder().addInt32Field("f_int2").addInt64Field("size").addDateTimeField("window_start").addDateTimeField("window_end").build();
        List<Row> expectedRows = TestUtils.RowsBuilder.of(resultType).addRows(0, 3L, DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 01:01:03"), DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 01:01:03"), 0, 1L, DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 02:04:03"), DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 02:04:03")).getRows();
        PAssert.that((PCollection)result).containsInAnyOrder(expectedRows);
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testWindowOnNonTimestampField() throws Exception {
        this.exceptions.expect(ParseException.class);
        this.exceptions.expectCause(ThrowableMessageMatcher.hasMessage((Matcher)Matchers.containsString((String)"Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL HOUR>)'")));
        this.pipeline.enableAbandonedNodeEnforcement(false);
        String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount` FROM TABLE_A GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)";
        PCollection result = (PCollection)PCollectionTuple.of((TupleTag)new TupleTag("TABLE_A"), (PCollection)this.boundedInput1).apply("testWindowOnNonTimestampField", (PTransform)SqlTransform.query((String)sql));
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testUnsupportedDistinct() throws Exception {
        this.exceptions.expect(ParseException.class);
        this.exceptions.expectCause(ThrowableMessageMatcher.hasMessage((Matcher)Matchers.containsString((String)"Encountered \"*\"")));
        this.pipeline.enableAbandonedNodeEnforcement(false);
        String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION GROUP BY f_int2";
        PCollection result = (PCollection)this.boundedInput1.apply("testUnsupportedDistinct", (PTransform)SqlTransform.query((String)sql));
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testUnsupportedDistinct2() throws Exception {
        this.exceptions.expect(IllegalArgumentException.class);
        this.exceptions.expectMessage(Matchers.containsString((String)"COUNT DISTINCT"));
        this.pipeline.enableAbandonedNodeEnforcement(false);
        String sql = "SELECT f_int2, COUNT(DISTINCT f_long)FROM PCOLLECTION GROUP BY f_int2";
        this.boundedInput1.apply("testUnsupportedDistinct", (PTransform)SqlTransform.query((String)sql));
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testUnsupportedGlobalWindowWithDefaultTrigger() {
        this.exceptions.expect(UnsupportedOperationException.class);
        this.pipeline.enableAbandonedNodeEnforcement(false);
        PCollection input = (PCollection)this.unboundedInput1.apply("unboundedInput1.globalWindow", (PTransform)Window.into((WindowFn)new GlobalWindows()).triggering((Trigger)DefaultTrigger.of()));
        String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2";
        input.apply("testUnsupportedGlobalWindows", (PTransform)SqlTransform.query((String)sql));
    }

    @Test
    public void testSupportsGlobalWindowWithCustomTrigger() throws Exception {
        this.pipeline.enableAbandonedNodeEnforcement(false);
        DateTime startTime = DateTimeUtils.parseTimestampWithoutTimeZone("2017-1-1 0:0:0");
        Schema type = Schema.builder().addInt32Field("f_intGroupingKey").addInt32Field("f_intValue").addDateTimeField("f_timestamp").build();
        Object[] rows = new Object[]{0, 1, startTime.plusSeconds(0), 0, 2, startTime.plusSeconds(1), 0, 3, startTime.plusSeconds(2), 0, 4, startTime.plusSeconds(3), 0, 5, startTime.plusSeconds(4), 0, 6, startTime.plusSeconds(6)};
        PCollection input = (PCollection)this.createTestPCollection(type, rows, "f_timestamp").apply((PTransform)Window.into((WindowFn)new GlobalWindows()).triggering((Trigger)Repeatedly.forever((Trigger)AfterPane.elementCountAtLeast((int)2))).discardingFiredPanes().withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY));
        String sql = "SELECT SUM(f_intValue) AS `sum` FROM PCOLLECTION GROUP BY f_intGroupingKey";
        PCollection result = (PCollection)input.apply("sql", (PTransform)SqlTransform.query((String)sql));
        Assert.assertEquals((Object)new GlobalWindows(), (Object)result.getWindowingStrategy().getWindowFn());
        PAssert.that((PCollection)result).containsInAnyOrder(this.rowsWithSingleIntField("sum", Arrays.asList(3, 7, 11)));
        this.pipeline.run();
    }

    @Test
    public void testSupportsAggregationWithoutProjection() throws Exception {
        this.pipeline.enableAbandonedNodeEnforcement(false);
        Schema schema = Schema.builder().addInt32Field("f_intGroupingKey").addInt32Field("f_intValue").build();
        PCollection inputRows = ((PCollection)this.pipeline.apply((PTransform)Create.of(TestUtils.rowsBuilderOf(schema).addRows(0, 1, 0, 2, 1, 3, 2, 4, 2, 5).getRows()))).setSchema(schema, SerializableFunctions.identity(), SerializableFunctions.identity());
        String sql = "SELECT SUM(f_intValue) FROM PCOLLECTION GROUP BY f_intGroupingKey";
        PCollection result = (PCollection)inputRows.apply("sql", (PTransform)SqlTransform.query((String)sql));
        PAssert.that((PCollection)result).containsInAnyOrder(this.rowsWithSingleIntField("sum", Arrays.asList(3, 3, 9)));
        this.pipeline.run();
    }

    @Test
    public void testSupportsNonGlobalWindowWithCustomTrigger() {
        DateTime startTime = DateTimeUtils.parseTimestampWithoutTimeZone("2017-1-1 0:0:0");
        Schema type = Schema.builder().addInt32Field("f_intGroupingKey").addInt32Field("f_intValue").addDateTimeField("f_timestamp").build();
        Object[] rows = new Object[]{0, 1, startTime.plusSeconds(0), 0, 2, startTime.plusSeconds(1), 0, 3, startTime.plusSeconds(2), 0, 4, startTime.plusSeconds(3), 0, 5, startTime.plusSeconds(4), 0, 6, startTime.plusSeconds(6)};
        PCollection input = (PCollection)this.createTestPCollection(type, rows, "f_timestamp").apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardSeconds((long)3L))).triggering((Trigger)Repeatedly.forever((Trigger)AfterPane.elementCountAtLeast((int)2))).discardingFiredPanes().withAllowedLateness(Duration.ZERO).withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY));
        String sql = "SELECT SUM(f_intValue) AS `sum` FROM PCOLLECTION GROUP BY f_intGroupingKey";
        PCollection result = (PCollection)input.apply("sql", (PTransform)SqlTransform.query((String)sql));
        Assert.assertEquals((Object)FixedWindows.of((Duration)Duration.standardSeconds((long)3L)), (Object)result.getWindowingStrategy().getWindowFn());
        PAssert.that((PCollection)result).containsInAnyOrder(this.rowsWithSingleIntField("sum", Arrays.asList(3, 3, 9, 6)));
        this.pipeline.run();
    }

    private List<Row> rowsWithSingleIntField(String fieldName, List<Integer> values) {
        return TestUtils.rowsBuilderOf(Schema.builder().addInt32Field(fieldName).build()).addRows(values).getRows();
    }

    private PCollection<Row> createTestPCollection(Schema type, Object[] rows, String timestampField) {
        return TestUtils.rowsBuilderOf(type).addRows(rows).getPCollectionBuilder().inPipeline((Pipeline)this.pipeline).withTimestampField(timestampField).buildUnbounded();
    }

    private static class CheckerBigDecimalDivide
    implements SerializableFunction<Iterable<Row>, Void> {
        private CheckerBigDecimalDivide() {
        }

        public Void apply(Iterable<Row> input) {
            Iterator<Row> iter = input.iterator();
            Assert.assertTrue((boolean)iter.hasNext());
            Row row = iter.next();
            Assert.assertEquals((double)row.getDouble("avg1"), (double)8.142857143, (double)1.0E-7);
            Assert.assertTrue((row.getInt32("avg2") == 8 ? 1 : 0) != 0);
            Assert.assertEquals((double)row.getDouble("varpop1"), (double)26.40816326, (double)1.0E-7);
            Assert.assertTrue((row.getInt32("varpop2") == 26 ? 1 : 0) != 0);
            Assert.assertEquals((double)row.getDouble("varsamp1"), (double)30.80952381, (double)1.0E-7);
            Assert.assertTrue((row.getInt32("varsamp2") == 30 ? 1 : 0) != 0);
            Assert.assertFalse((boolean)iter.hasNext());
            return null;
        }
    }
}

