/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.BeamSql;
import org.apache.beam.sdk.extensions.sql.BeamSqlDslBase;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.BeamRecordType;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class BeamSqlDslAggregationTest
extends BeamSqlDslBase {
    public PCollection<BeamRecord> boundedInput3;

    @Before
    public void setUp() {
        BeamRecordSqlType rowTypeInTableB = BeamRecordSqlType.create(Arrays.asList("f_int", "f_double", "f_int2", "f_decimal"), Arrays.asList(4, 8, 4, 3));
        ArrayList<BeamRecord> recordsInTableB = new ArrayList<BeamRecord>();
        BeamRecord row1 = new BeamRecord((BeamRecordType)rowTypeInTableB, new Object[]{1, 1.0, 0, new BigDecimal(1)});
        recordsInTableB.add(row1);
        BeamRecord row2 = new BeamRecord((BeamRecordType)rowTypeInTableB, new Object[]{4, 4.0, 0, new BigDecimal(4)});
        recordsInTableB.add(row2);
        BeamRecord row3 = new BeamRecord((BeamRecordType)rowTypeInTableB, new Object[]{7, 7.0, 0, new BigDecimal(7)});
        recordsInTableB.add(row3);
        BeamRecord row4 = new BeamRecord((BeamRecordType)rowTypeInTableB, new Object[]{13, 13.0, 0, new BigDecimal(13)});
        recordsInTableB.add(row4);
        BeamRecord row5 = new BeamRecord((BeamRecordType)rowTypeInTableB, new Object[]{5, 5.0, 0, new BigDecimal(5)});
        recordsInTableB.add(row5);
        BeamRecord row6 = new BeamRecord((BeamRecordType)rowTypeInTableB, new Object[]{10, 10.0, 0, new BigDecimal(10)});
        recordsInTableB.add(row6);
        BeamRecord row7 = new BeamRecord((BeamRecordType)rowTypeInTableB, new Object[]{17, 17.0, 0, new BigDecimal(17)});
        recordsInTableB.add(row7);
        this.boundedInput3 = (PCollection)PBegin.in((Pipeline)this.pipeline).apply("boundedInput3", (PTransform)Create.of(recordsInTableB).withCoder((Coder)rowTypeInTableB.getRecordCoder()));
    }

    @Test
    public void testAggregationWithoutWindowWithBounded() throws Exception {
        this.runAggregationWithoutWindow((PCollection<BeamRecord>)this.boundedInput1);
    }

    @Test
    public void testAggregationWithoutWindowWithUnbounded() throws Exception {
        this.runAggregationWithoutWindow((PCollection<BeamRecord>)this.unboundedInput1);
    }

    private void runAggregationWithoutWindow(PCollection<BeamRecord> input) throws Exception {
        String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount` FROM PCOLLECTION GROUP BY f_int2";
        PCollection result = (PCollection)input.apply("testAggregationWithoutWindow", (PTransform)BeamSql.query((String)sql));
        BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int2", "size"), Arrays.asList(4, -5));
        BeamRecord record = new BeamRecord((BeamRecordType)resultType, new Object[]{0, 4L});
        PAssert.that((PCollection)result).containsInAnyOrder((Object[])new BeamRecord[]{record});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testAggregationFunctionsWithBounded() throws Exception {
        this.runAggregationFunctions((PCollection<BeamRecord>)this.boundedInput1);
    }

    @Test
    public void testAggregationFunctionsWithUnbounded() throws Exception {
        this.runAggregationFunctions((PCollection<BeamRecord>)this.unboundedInput1);
    }

    private void runAggregationFunctions(PCollection<BeamRecord> input) throws Exception {
        String sql = "select f_int2, count(*) as getFieldCount, sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as min1, sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short) as min2, sum(f_byte) as sum3, avg(f_byte) as avg3, max(f_byte) as max3, min(f_byte) as min3, sum(f_float) as sum4, avg(f_float) as avg4, max(f_float) as max4, min(f_float) as min4, sum(f_double) as sum5, avg(f_double) as avg5, max(f_double) as max5, min(f_double) as min5, max(f_timestamp) as max6, min(f_timestamp) as min6, var_pop(f_double) as varpop1, var_samp(f_double) as varsamp1, var_pop(f_int) as varpop2, var_samp(f_int) as varsamp2 FROM TABLE_A group by f_int2";
        PCollection result = (PCollection)PCollectionTuple.of((TupleTag)new TupleTag("TABLE_A"), input).apply("testAggregationFunctions", (PTransform)BeamSql.queryMulti((String)sql));
        BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2", "min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5", "max5", "min5", "max6", "min6", "varpop1", "varsamp1", "varpop2", "varsamp2"), Arrays.asList(4, -5, -5, -5, -5, -5, 5, 5, 5, 5, -6, -6, -6, -6, 6, 6, 6, 6, 8, 8, 8, 8, 93, 93, 8, 8, 4, 4));
        BeamRecord record = new BeamRecord((BeamRecordType)resultType, new Object[]{0, 4L, 10000L, 2500L, 4000L, 1000L, (short)10, (short)2, (short)4, (short)1, (byte)10, (byte)2, (byte)4, (byte)1, Float.valueOf(10.0f), Float.valueOf(2.5f), Float.valueOf(4.0f), Float.valueOf(1.0f), 10.0, 2.5, 4.0, 1.0, FORMAT.parse("2017-01-01 02:04:03"), FORMAT.parse("2017-01-01 01:01:03"), 1.25, 1.666666667, 1, 1});
        PAssert.that((PCollection)result).containsInAnyOrder((Object[])new BeamRecord[]{record});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testAggregationFunctionsWithBoundedOnBigDecimalDivide() throws Exception {
        String sql = "SELECT AVG(f_double) as avg1, AVG(f_int) as avg2, VAR_POP(f_double) as varpop1, VAR_POP(f_int) as varpop2, VAR_SAMP(f_double) as varsamp1, VAR_SAMP(f_int) as varsamp2 FROM PCOLLECTION GROUP BY f_int2";
        PCollection result = (PCollection)this.boundedInput3.apply("testAggregationWithDecimalValue", (PTransform)BeamSql.query((String)sql));
        BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("avg1", "avg2", "avg3", "varpop1", "varpop2", "varsamp1", "varsamp2"), Arrays.asList(8, 4, 3, 8, 4, 8, 4));
        PAssert.that((PCollection)result).satisfies((SerializableFunction)new CheckerBigDecimalDivide());
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testDistinctWithBounded() throws Exception {
        this.runDistinct((PCollection<BeamRecord>)this.boundedInput1);
    }

    @Test
    public void testDistinctWithUnbounded() throws Exception {
        this.runDistinct((PCollection<BeamRecord>)this.unboundedInput1);
    }

    private void runDistinct(PCollection<BeamRecord> input) throws Exception {
        String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION ";
        PCollection result = (PCollection)input.apply("testDistinct", (PTransform)BeamSql.query((String)sql));
        BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(4, -5));
        BeamRecord record1 = new BeamRecord((BeamRecordType)resultType, new Object[]{1, 1000L});
        BeamRecord record2 = new BeamRecord((BeamRecordType)resultType, new Object[]{2, 2000L});
        BeamRecord record3 = new BeamRecord((BeamRecordType)resultType, new Object[]{3, 3000L});
        BeamRecord record4 = new BeamRecord((BeamRecordType)resultType, new Object[]{4, 4000L});
        PAssert.that((PCollection)result).containsInAnyOrder((Object[])new BeamRecord[]{record1, record2, record3, record4});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testTumbleWindowWithBounded() throws Exception {
        this.runTumbleWindow((PCollection<BeamRecord>)this.boundedInput1);
    }

    @Test
    public void testTumbleWindowWithUnbounded() throws Exception {
        this.runTumbleWindow((PCollection<BeamRecord>)this.unboundedInput1);
    }

    private void runTumbleWindow(PCollection<BeamRecord> input) throws Exception {
        String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`, TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start` FROM TABLE_A GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)";
        PCollection result = (PCollection)PCollectionTuple.of((TupleTag)new TupleTag("TABLE_A"), input).apply("testTumbleWindow", (PTransform)BeamSql.queryMulti((String)sql));
        BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int2", "size", "window_start"), Arrays.asList(4, -5, 93));
        BeamRecord record1 = new BeamRecord((BeamRecordType)resultType, new Object[]{0, 3L, FORMAT.parse("2017-01-01 01:00:00")});
        BeamRecord record2 = new BeamRecord((BeamRecordType)resultType, new Object[]{0, 1L, FORMAT.parse("2017-01-01 02:00:00")});
        PAssert.that((PCollection)result).containsInAnyOrder((Object[])new BeamRecord[]{record1, record2});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testHopWindowWithBounded() throws Exception {
        this.runHopWindow((PCollection<BeamRecord>)this.boundedInput1);
    }

    @Test
    public void testHopWindowWithUnbounded() throws Exception {
        this.runHopWindow((PCollection<BeamRecord>)this.unboundedInput1);
    }

    private void runHopWindow(PCollection<BeamRecord> input) throws Exception {
        String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`, HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start` FROM PCOLLECTION GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
        PCollection result = (PCollection)input.apply("testHopWindow", (PTransform)BeamSql.query((String)sql));
        BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int2", "size", "window_start"), Arrays.asList(4, -5, 93));
        BeamRecord record1 = new BeamRecord((BeamRecordType)resultType, new Object[]{0, 3L, FORMAT.parse("2017-01-01 00:30:00")});
        BeamRecord record2 = new BeamRecord((BeamRecordType)resultType, new Object[]{0, 3L, FORMAT.parse("2017-01-01 01:00:00")});
        BeamRecord record3 = new BeamRecord((BeamRecordType)resultType, new Object[]{0, 1L, FORMAT.parse("2017-01-01 01:30:00")});
        BeamRecord record4 = new BeamRecord((BeamRecordType)resultType, new Object[]{0, 1L, FORMAT.parse("2017-01-01 02:00:00")});
        PAssert.that((PCollection)result).containsInAnyOrder((Object[])new BeamRecord[]{record1, record2, record3, record4});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testSessionWindowWithBounded() throws Exception {
        this.runSessionWindow((PCollection<BeamRecord>)this.boundedInput1);
    }

    @Test
    public void testSessionWindowWithUnbounded() throws Exception {
        this.runSessionWindow((PCollection<BeamRecord>)this.unboundedInput1);
    }

    private void runSessionWindow(PCollection<BeamRecord> input) throws Exception {
        String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`, SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start` FROM TABLE_A GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";
        PCollection result = (PCollection)PCollectionTuple.of((TupleTag)new TupleTag("TABLE_A"), input).apply("testSessionWindow", (PTransform)BeamSql.queryMulti((String)sql));
        BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int2", "size", "window_start"), Arrays.asList(4, -5, 93));
        BeamRecord record1 = new BeamRecord((BeamRecordType)resultType, new Object[]{0, 3L, FORMAT.parse("2017-01-01 01:01:03")});
        BeamRecord record2 = new BeamRecord((BeamRecordType)resultType, new Object[]{0, 1L, FORMAT.parse("2017-01-01 02:04:03")});
        PAssert.that((PCollection)result).containsInAnyOrder((Object[])new BeamRecord[]{record1, record2});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testWindowOnNonTimestampField() throws Exception {
        this.exceptions.expect(IllegalStateException.class);
        this.exceptions.expectMessage("Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL HOUR>)'");
        this.pipeline.enableAbandonedNodeEnforcement(false);
        String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount` FROM TABLE_A GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)";
        PCollection result = (PCollection)PCollectionTuple.of((TupleTag)new TupleTag("TABLE_A"), (PCollection)this.boundedInput1).apply("testWindowOnNonTimestampField", (PTransform)BeamSql.queryMulti((String)sql));
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testUnsupportedDistinct() throws Exception {
        this.exceptions.expect(IllegalStateException.class);
        this.exceptions.expectMessage("Encountered \"*\"");
        this.pipeline.enableAbandonedNodeEnforcement(false);
        String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION GROUP BY f_int2";
        PCollection result = (PCollection)this.boundedInput1.apply("testUnsupportedDistinct", (PTransform)BeamSql.query((String)sql));
        this.pipeline.run().waitUntilFinish();
    }

    private static class CheckerBigDecimalDivide
    implements SerializableFunction<Iterable<BeamRecord>, Void> {
        private CheckerBigDecimalDivide() {
        }

        public Void apply(Iterable<BeamRecord> input) {
            Iterator<BeamRecord> iter = input.iterator();
            Assert.assertTrue((boolean)iter.hasNext());
            BeamRecord row = iter.next();
            Assert.assertEquals((double)row.getDouble("avg1"), (double)8.142857143, (double)1.0E-7);
            Assert.assertTrue((row.getInteger("avg2") == 8 ? 1 : 0) != 0);
            Assert.assertEquals((double)row.getDouble("varpop1"), (double)26.40816326, (double)1.0E-7);
            Assert.assertTrue((row.getInteger("varpop2") == 26 ? 1 : 0) != 0);
            Assert.assertEquals((double)row.getDouble("varsamp1"), (double)30.80952381, (double)1.0E-7);
            Assert.assertTrue((row.getInteger("varsamp2") == 30 ? 1 : 0) != 0);
            Assert.assertFalse((boolean)iter.hasNext());
            return null;
        }
    }
}

