package org.apache.flink.table.planner.runtime.stream.sql;

import java.math.BigDecimal;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.scala.package$;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy$;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.table.planner.runtime.utils.TestingRetractSink;
import org.apache.flink.table.planner.runtime.utils.TestingUpsertTableSink;
import org.apache.flink.table.planner.runtime.utils.TimeTestUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Predef$;
import scala.Symbol;
import scala.Symbol$;
import scala.Tuple3;
import scala.Tuple4;
import scala.Tuple7;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.math.Ordering$String$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: WindowAggregateITCase.scala */
@RunWith(Parameterized.class)
@ScalaSignature(bytes = "\u0006\u0001\u0005Mc\u0001B\u0001\u0003\u0001M\u0011QcV5oI><\u0018iZ4sK\u001e\fG/Z%U\u0007\u0006\u001cXM\u0003\u0002\u0004\t\u0005\u00191/\u001d7\u000b\u0005\u00151\u0011AB:ue\u0016\fWN\u0003\u0002\b\u0011\u00059!/\u001e8uS6,'BA\u0005\u000b\u0003\u001d\u0001H.\u00198oKJT!a\u0003\u0007\u0002\u000bQ\f'\r\\3\u000b\u00055q\u0011!\u00024mS:\\'BA\b\u0011\u0003\u0019\t\u0007/Y2iK*\t\u0011#A\u0002pe\u001e\u001c\u0001a\u0005\u0002\u0001)A\u0011Q\u0003G\u0007\u0002-)\u0011qCB\u0001\u0006kRLGn]\u0005\u00033Y\u0011!d\u0015;sK\u0006l\u0017N\\4XSRD7\u000b^1uKR+7\u000f\u001e\"bg\u0016D\u0001b\u0007\u0001\u0003\u0002\u0003\u0006I\u0001H\u0001\u0005[>$W\r\u0005\u0002\u001ec9\u0011ad\f\b\u0003?9r!\u0001I\u0017\u000f\u0005\u0005bcB\u0001\u0012,\u001d\t\u0019#F\u0004\u0002%S9\u0011Q\u0005K\u0007\u0002M)\u0011qEE\u0001\u0007yI|w\u000e\u001e \n\u0003EI!a\u0004\t\n\u00055q\u0011BA\u0006\r\u0013\tI!\"\u0003\u0002\b\u0011%\u0011qCB\u0005\u0003aY\t!d\u0015;sK\u0006l\u0017N\\4XSRD7\u000b^1uKR+7\u000f\u001e\"bg\u0016L!AM\u001a\u0003!M#\u0018\r^3CC\u000e\\WM\u001c3N_\u0012,'B\u0001\u0019\u0017\u0011\u0015)\u0004\u0001\"\u00017\u0003\u0019a\u0014N\\5u}Q\u0011q'\u000f\t\u0003q\u0001i\u0011A\u0001\u0005\u00067Q\u0002\r\u0001\b\u0005\bw\u0001\u0011\r\u0011\"\u0001=\u0003\u0011!\u0017\r^1\u0016\u0003u\u00022AP#H\u001b\u0005y$B\u0001!B\u0003%IW.\\;uC\ndWM\u0003\u0002C\u0007\u0006Q1m\u001c7mK\u000e$\u0018n\u001c8\u000b\u0003\u0011\u000bQa]2bY\u0006L!AR \u0003\t1K7\u000f\u001e\t\n\u0011&[e*\u0015+X?~k\u0011aQ\u0005\u0003\u0015\u000e\u0013a\u0001V;qY\u0016<\u0004C\u0001%M\u0013\ti5I\u0001\u0003M_:<\u0007C\u0001%P\u0013\t\u00016IA\u0002J]R\u0004\"\u0001\u0013*\n\u0005M\u001b%A\u0002#pk\ndW\r\u0005\u0002I+&\u0011ak\u0011\u0002\u0006\r2|\u0017\r\u001e\t\u00031vk\u0011!\u0017\u0006\u00035n\u000bA!\\1uQ*\tA,\u0001\u0003kCZ\f\u0017B\u00010Z\u0005)\u0011\u0015n\u001a#fG&l\u0017\r\u001c\t\u0003A\u000et!\u0001S1\n\u0005\t\u001c\u0015A\u0002)sK\u0012,g-\u0003\u0002eK\n11\u000b\u001e:j]\u001eT!AY\"\t\r\u001d\u0004\u0001\u0015!\u0003>\u0003\u0015!\u0017\r^1!\u0011\u0015I\u0007\u0001\"\u0001k\u0003i!Xm\u001d;Fm\u0016tG\u000fV5nKNc\u0017\u000eZ5oO^Kg\u000eZ8x)\u0005Y\u0007C\u0001%m\u0013\ti7I\u0001\u0003V]&$\bF\u00015p!\t\u00018/D\u0001r\u0015\t\u0011\b#A\u0003kk:LG/\u0003\u0002uc\n!A+Z:u\u0011\u00151\b\u0001\"\u0001k\u0003e!Xm\u001d;DCN\u001c\u0017\rZ5oOR+XN\u00197f/&tGm\\<)\u0005U|\u0007\"B=\u0001\t\u0003Q\u0017A\u0007;fgR,e/\u001a8u)&lWmU3tg&|gnV5oI><\bF\u0001=p\u0011\u0015a\b\u0001\"\u0001k\u00031\"Xm\u001d;Fm\u0016tG\u000fV5nKR+XN\u00197j]\u001e<\u0016N\u001c3po^KG\u000f[!mY><H*\u0019;f]\u0016\u001c8\u000f\u000b\u0002|_\")q\u0010\u0001C\u0001U\u0006)D/Z:u\t&\u001cH/\u001b8di\u0006;wmV5uQ6+'oZ3P]\u00163XM\u001c;US6,7+Z:tS>twI]8va^Kg\u000eZ8xQ\tqx\u000e\u0003\u0004\u0002\u0006\u0001!\tA[\u0001\u001di\u0016\u001cH/T5o\u001b\u0006Dx+\u001b;i)Vl'\r\\5oO^Kg\u000eZ8xQ\r\t\u0019a\u001c\u0005\b\u0003\u0017\u0001A\u0011BA\u0007\u0003E9\u0018\u000e\u001e5MCR,g)\u001b:f\t\u0016d\u0017-\u001f\u000b\u0006W\u0006=\u0011q\u0004\u0005\t\u0003#\tI\u00011\u0001\u0002\u0014\u0005YA/\u00192mK\u000e{gNZ5h!\u0011\t)\"a\u0007\u000e\u0005\u0005]!bAA\r\u0015\u0005\u0019\u0011\r]5\n\t\u0005u\u0011q\u0003\u0002\f)\u0006\u0014G.Z\"p]\u001aLw\r\u0003\u0005\u0002\"\u0005%\u0001\u0019AA\u0012\u0003!Ig\u000e^3sm\u0006d\u0007\u0003BA\u0013\u0003ci!!a\n\u000b\t\u0005%\u00121F\u0001\u0005i&lWM\u0003\u0003\u0002.\u0005=\u0012AB2p[6|gNC\u0002\u0002\u001a1IA!a\r\u0002(\t!A+[7fQ\u001d\u0001\u0011qGA\"\u0003\u000b\u0002B!!\u000f\u0002@5\u0011\u00111\b\u0006\u0004\u0003{\t\u0018A\u0002:v]:,'/\u0003\u0003\u0002B\u0005m\"a\u0002*v]^KG\u000f[\u0001\u0006m\u0006dW/Z\u0012\u0003\u0003\u000f\u0002B!!\u0013\u0002P5\u0011\u00111\n\u0006\u0004\u0003\u001b\n\u0018a\u0002:v]:,'o]\u0005\u0005\u0003#\nYEA\u0007QCJ\fW.\u001a;fe&TX\r\u001a")
/* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.class */
public class WindowAggregateITCase extends StreamingWithStateTestBase {
    private final List<Tuple7<Object, Object, Object, Object, BigDecimal, String, String>> data;
    private static Symbol symbol$1 = Symbol$.MODULE$.apply("rowtime");
    private static Symbol symbol$2 = Symbol$.MODULE$.apply("int");
    private static Symbol symbol$3 = Symbol$.MODULE$.apply("double");
    private static Symbol symbol$4 = Symbol$.MODULE$.apply("float");
    private static Symbol symbol$5 = Symbol$.MODULE$.apply("bigdec");
    private static Symbol symbol$6 = Symbol$.MODULE$.apply("string");
    private static Symbol symbol$7 = Symbol$.MODULE$.apply("name");
    private static Symbol symbol$8 = Symbol$.MODULE$.apply("long");
    private static Symbol symbol$9 = Symbol$.MODULE$.apply("a");
    private static Symbol symbol$10 = Symbol$.MODULE$.apply("b");
    private static Symbol symbol$11 = Symbol$.MODULE$.apply("c");

    public List<Tuple7<Object, Object, Object, Object, BigDecimal, String, String>> data() {
        return this.data;
    }

    @Test
    public void testEventTimeSlidingWindow() {
        tEnv().registerFunction("concat_distinct_agg", new JavaUserDefinedAggFunctions.ConcatDistinctAggFunction(), BasicTypeInfo.getInfoFor(String.class), TypeExtractor.createTypeInfo(JavaUserDefinedAggFunctions.ConcatAcc.class));
        tEnv().registerTable("T1", package$.MODULE$.dataStreamConversions(failingDataSource(data(), new WindowAggregateITCase$$anon$7(this)).assignTimestampsAndWatermarks(new TimeTestUtil.TimestampAndWatermarkWithOffset(10L))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.UnresolvedFieldExpression(symbol$1).rowtime(), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3), package$.MODULE$.symbol2FieldExpression(symbol$4), package$.MODULE$.symbol2FieldExpression(symbol$5), package$.MODULE$.symbol2FieldExpression(symbol$6), package$.MODULE$.symbol2FieldExpression(symbol$7)})));
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `string`,\n        |  HOP_START(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),\n        |  HOP_ROWTIME(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),\n        |  COUNT(1),\n        |  SUM(1),\n        |  COUNT(`int`),\n        |  COUNT(DISTINCT `float`),\n        |  concat_distinct_agg(name)\n        |FROM T1\n        |GROUP BY `string`, HOP(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND)\n      ")).stripMargin();
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"Hallo,1970-01-01T00:00,1970-01-01T00:00:00.004,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.004,1970-01-01T00:00:00.008,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.008,1970-01-01T00:00:00.012,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.012,1970-01-01T00:00:00.016,1,1,1,1,b", "Hello world,1970-01-01T00:00:00.016,1970-01-01T00:00:00.020,1,1,1,1,b", "Hello,1970-01-01T00:00,1970-01-01T00:00:00.004,2,2,2,2,a", "Hello,1970-01-01T00:00:00.004,1970-01-01T00:00:00.008,3,3,3,2,a|b", "Hi,1970-01-01T00:00,1970-01-01T00:00:00.004,1,1,1,1,a", "null,1970-01-01T00:00:00.028,1970-01-01T00:00:00.032,1,1,1,1,null", "null,1970-01-01T00:00:00.032,1970-01-01T00:00:00.036,1,1,1,1,null"})).sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testCascadingTumbleWindow() {
        tEnv().registerTable("T1", package$.MODULE$.dataStreamConversions(failingDataSource(data(), new WindowAggregateITCase$$anon$8(this)).assignTimestampsAndWatermarks(new TimeTestUtil.TimestampAndWatermarkWithOffset(10L))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.UnresolvedFieldExpression(symbol$1).rowtime(), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3), package$.MODULE$.symbol2FieldExpression(symbol$4), package$.MODULE$.symbol2FieldExpression(symbol$5), package$.MODULE$.symbol2FieldExpression(symbol$6), package$.MODULE$.symbol2FieldExpression(symbol$7)})));
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT SUM(cnt)\n        |FROM (\n        |  SELECT COUNT(1) AS cnt, TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) AS ts\n        |  FROM T1\n        |  GROUP BY `int`, `string`, TUMBLE(rowtime, INTERVAL '10' SECOND)\n        |)\n        |GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)\n        |")).stripMargin();
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"9"})).sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testEventTimeSessionWindow() {
        tEnv().registerTable("T1", package$.MODULE$.dataStreamConversions(failingDataSource(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple4[]{new Tuple4(BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToInteger(1), "Hello", "a"), new Tuple4(BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToInteger(2), "Hello", "b"), new Tuple4(BoxesRunTime.boxToLong(8L), BoxesRunTime.boxToInteger(8), "Hello", "a"), new Tuple4(BoxesRunTime.boxToLong(9L), BoxesRunTime.boxToInteger(9), "Hello World", "b"), new Tuple4(BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToInteger(4), "Hello", "c"), new Tuple4(BoxesRunTime.boxToLong(16L), BoxesRunTime.boxToInteger(16), "Hello", "d")})), new WindowAggregateITCase$$anon$9(this)).assignTimestampsAndWatermarks(new TimeTestUtil.TimestampAndWatermarkWithOffset(10L))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.UnresolvedFieldExpression(symbol$1).rowtime(), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$6), package$.MODULE$.symbol2FieldExpression(symbol$7)})));
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `string`,\n        |  SESSION_START(rowtime, INTERVAL '0.005' SECOND),\n        |  SESSION_ROWTIME(rowtime, INTERVAL '0.005' SECOND),\n        |  COUNT(1),\n        |  SUM(1),\n        |  COUNT(`int`),\n        |  SUM(`int`),\n        |  COUNT(DISTINCT name)\n        |FROM T1\n        |GROUP BY `string`, SESSION(rowtime, INTERVAL '0.005' SECOND)\n      ")).stripMargin();
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"Hello World,1970-01-01T00:00:00.009,1970-01-01T00:00:00.013,1,1,1,9,1", "Hello,1970-01-01T00:00:00.016,1970-01-01T00:00:00.020,1,1,1,16,1", "Hello,1970-01-01T00:00:00.001,1970-01-01T00:00:00.012,4,4,4,15,3"})).sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testEventTimeTumblingWindowWithAllowLateness() {
        tEnv().getConfig().setIdleStateRetentionTime(Time.milliseconds(10L), Time.minutes(6L));
        withLateFireDelay(tEnv().getConfig(), Time.of(0L, TimeUnit.NANOSECONDS));
        tEnv().registerTable("T1", package$.MODULE$.dataStreamConversions(failingDataSource(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple3[]{new Tuple3(BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToInteger(1), "Hi"), new Tuple3(BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToInteger(2), "Hello"), new Tuple3(BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToInteger(2), "Hello"), new Tuple3(BoxesRunTime.boxToLong(8L), BoxesRunTime.boxToInteger(3), "Hello world"), new Tuple3(BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToInteger(3), "Hello"), new Tuple3(BoxesRunTime.boxToLong(16L), BoxesRunTime.boxToInteger(3), "Hello world"), new Tuple3(BoxesRunTime.boxToLong(9L), BoxesRunTime.boxToInteger(4), "Hello world"), new Tuple3(BoxesRunTime.boxToLong(3L), BoxesRunTime.boxToInteger(1), "Hi")})), new WindowAggregateITCase$$anon$10(this)).assignTimestampsAndWatermarks(new TimeTestUtil.TimestampAndWatermarkWithOffset(0L))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$8), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$6), package$.MODULE$.UnresolvedFieldExpression(symbol$1).rowtime()})));
        tEnv().registerFunction("weightAvgFun", new JavaUserDefinedAggFunctions.WeightedAvg(), BasicTypeInfo.getInfoFor(Long.class), TypeExtractor.createTypeInfo(JavaUserDefinedAggFunctions.WeightedAvgAccum.class));
        Table sqlQuery = tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `string`,\n        |  TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) as w_start,\n        |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND),\n        |  COUNT(DISTINCT `long`),\n        |  COUNT(`int`),\n        |  CAST(AVG(`int`) AS INT),\n        |  weightAvgFun(`long`, `int`),\n        |  MIN(`int`),\n        |  MAX(`int`),\n        |  SUM(`int`)\n        |FROM T1\n        |GROUP BY `string`, TUMBLE(rowtime, INTERVAL '0.005' SECOND)\n      ")).stripMargin());
        TypeInformation<?>[] typeInformationArr = {Types.STRING(), Types.LOCAL_DATE_TIME(), Types.LOCAL_DATE_TIME(), Types.LONG(), Types.LONG(), Types.INT(), Types.LONG(), Types.INT(), Types.INT(), Types.INT()};
        TestingUpsertTableSink configure = new TestingUpsertTableSink(new int[]{0, 1}).configure((String[]) ((TraversableOnce) Predef$.MODULE$.refArrayOps(typeInformationArr).indices().map(new WindowAggregateITCase$$anonfun$1(this), IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(String.class)), typeInformationArr);
        tEnv().registerTableSink("MySink", configure);
        tEnv().insertInto("MySink", sqlQuery);
        tEnv().execute("test");
        Assert.assertEquals(((TraversableOnce) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"Hi,1970-01-01T00:00,1970-01-01T00:00:00.005,1,1,1,1,1,1,1", "Hello,1970-01-01T00:00,1970-01-01T00:00:00.005,2,3,2,3,2,3,7", "Hello world,1970-01-01T00:00:00.015,1970-01-01T00:00:00.020,1,1,3,16,3,3,3", "Hello world,1970-01-01T00:00:00.005,1970-01-01T00:00:00.010,2,2,3,8,3,4,7"})).sorted(Ordering$String$.MODULE$)).mkString("\n"), ((TraversableOnce) configure.getUpsertResults().sorted(Ordering$String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testDistinctAggWithMergeOnEventTimeSessionGroupWindow() {
        tEnv().registerTable("MyTable", package$.MODULE$.dataStreamConversions(failingDataSource(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple3[]{new Tuple3(BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToInteger(2), "Hello"), new Tuple3(BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToInteger(2), "Hello"), new Tuple3(BoxesRunTime.boxToLong(8L), BoxesRunTime.boxToInteger(2), "Hello"), new Tuple3(BoxesRunTime.boxToLong(10L), BoxesRunTime.boxToInteger(3), "Hello"), new Tuple3(BoxesRunTime.boxToLong(9L), BoxesRunTime.boxToInteger(9), "Hello World"), new Tuple3(BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToInteger(1), "Hello"), new Tuple3(BoxesRunTime.boxToLong(16L), BoxesRunTime.boxToInteger(16), "Hello")})), new WindowAggregateITCase$$anon$11(this)).assignTimestampsAndWatermarks(new TimeTestUtil.TimestampAndWatermarkWithOffset(10L))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$9), package$.MODULE$.symbol2FieldExpression(symbol$10), package$.MODULE$.symbol2FieldExpression(symbol$11), package$.MODULE$.UnresolvedFieldExpression(symbol$1).rowtime()})));
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT c,\n        |   COUNT(DISTINCT b),\n        |   SESSION_END(rowtime, INTERVAL '0.005' SECOND)\n        |FROM MyTable\n        |GROUP BY c, SESSION(rowtime, INTERVAL '0.005' SECOND)\n      ")).stripMargin();
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"Hello World,1,1970-01-01T00:00:00.014", "Hello,1,1970-01-01T00:00:00.021", "Hello,3,1970-01-01T00:00:00.015"})).sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testMinMaxWithTumblingWindow() {
        tEnv().registerTable("T1", package$.MODULE$.dataStreamConversions(failingDataSource(data(), new WindowAggregateITCase$$anon$12(this)).assignTimestampsAndWatermarks(new TimeTestUtil.TimestampAndWatermarkWithOffset(10L))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.UnresolvedFieldExpression(symbol$1).rowtime(), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3), package$.MODULE$.symbol2FieldExpression(symbol$4), package$.MODULE$.symbol2FieldExpression(symbol$5), package$.MODULE$.symbol2FieldExpression(symbol$6), package$.MODULE$.symbol2FieldExpression(symbol$7)})));
        tEnv().getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true);
        tEnv().getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay", "1000 ms");
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        | MAX(max_ts),\n        | MIN(min_ts),\n        | `string`\n        |FROM(\n        | SELECT\n        | `string`,\n        | `int`,\n        | MAX(rowtime) as max_ts,\n        | MIN(rowtime) as min_ts\n        | FROM T1\n        | GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND))\n        |GROUP BY `string`\n      ")).stripMargin();
        TestingRetractSink testingRetractSink = new TestingRetractSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toRetractStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingRetractSink);
        env().execute();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"1970-01-01T00:00:00.001,1970-01-01T00:00:00.001,Hi", "1970-01-01T00:00:00.002,1970-01-01T00:00:00.002,Hallo", "1970-01-01T00:00:00.007,1970-01-01T00:00:00.003,Hello", "1970-01-01T00:00:00.016,1970-01-01T00:00:00.008,Hello world", "1970-01-01T00:00:00.032,1970-01-01T00:00:00.032,null"})).sorted(Ordering$String$.MODULE$), testingRetractSink.getRetractResults().sorted(Ordering$String$.MODULE$));
    }

    private void withLateFireDelay(TableConfig tableConfig, Time time) {
        long milliseconds = time.toMilliseconds();
        Long millisecondFromConfigDuration = TableConfigUtils.getMillisecondFromConfigDuration(tableConfig, WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_LATE_FIRE_DELAY());
        if (millisecondFromConfigDuration != null && !BoxesRunTime.equalsNumObject(millisecondFromConfigDuration, BoxesRunTime.boxToLong(milliseconds))) {
            throw new RuntimeException("Currently not support different lateFireInterval configs in one job");
        }
        tableConfig.getConfiguration().setBoolean(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_LATE_FIRE_ENABLED(), true);
        tableConfig.getConfiguration().setString(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_LATE_FIRE_DELAY(), new StringBuilder().append(milliseconds).append(" ms").toString());
    }

    public WindowAggregateITCase(StreamingWithStateTestBase.StateBackendMode stateBackendMode) {
        super(stateBackendMode);
        this.data = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple7[]{new Tuple7(BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToDouble(1.0d), BoxesRunTime.boxToFloat(1.0f), new BigDecimal("1"), "Hi", "a"), new Tuple7(BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToInteger(2), BoxesRunTime.boxToDouble(2.0d), BoxesRunTime.boxToFloat(2.0f), new BigDecimal("2"), "Hallo", "a"), new Tuple7(BoxesRunTime.boxToLong(3L), BoxesRunTime.boxToInteger(2), BoxesRunTime.boxToDouble(2.0d), BoxesRunTime.boxToFloat(2.0f), new BigDecimal("2"), "Hello", "a"), new Tuple7(BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToInteger(5), BoxesRunTime.boxToDouble(5.0d), BoxesRunTime.boxToFloat(5.0f), new BigDecimal("5"), "Hello", "a"), new Tuple7(BoxesRunTime.boxToLong(7L), BoxesRunTime.boxToInteger(3), BoxesRunTime.boxToDouble(3.0d), BoxesRunTime.boxToFloat(3.0f), new BigDecimal("3"), "Hello", "b"), new Tuple7(BoxesRunTime.boxToLong(6L), BoxesRunTime.boxToInteger(5), BoxesRunTime.boxToDouble(5.0d), BoxesRunTime.boxToFloat(5.0f), new BigDecimal("5"), "Hello", "a"), new Tuple7(BoxesRunTime.boxToLong(8L), BoxesRunTime.boxToInteger(3), BoxesRunTime.boxToDouble(3.0d), BoxesRunTime.boxToFloat(3.0f), new BigDecimal("3"), "Hello world", "a"), new Tuple7(BoxesRunTime.boxToLong(16L), BoxesRunTime.boxToInteger(4), BoxesRunTime.boxToDouble(4.0d), BoxesRunTime.boxToFloat(4.0f), new BigDecimal("4"), "Hello world", "b"), new Tuple7(BoxesRunTime.boxToLong(32L), BoxesRunTime.boxToInteger(4), BoxesRunTime.boxToDouble(4.0d), BoxesRunTime.boxToFloat(4.0f), new BigDecimal("4"), (Object) null, (Object) null)}));
    }
}
