package org.apache.flink.table.planner.plan.stream.sql;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.scala.package$;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy$;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.Row;
import org.junit.Before;
import org.junit.Test;
import scala.Predef$;
import scala.Symbol;
import scala.Symbol$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: MiniBatchIntervalInferTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005%b\u0001B\u0001\u0003\u0001M\u0011!$T5oS\n\u000bGo\u00195J]R,'O^1m\u0013:4WM\u001d+fgRT!a\u0001\u0003\u0002\u0007M\fHN\u0003\u0002\u0006\r\u000511\u000f\u001e:fC6T!a\u0002\u0005\u0002\tAd\u0017M\u001c\u0006\u0003\u0013)\tq\u0001\u001d7b]:,'O\u0003\u0002\f\u0019\u0005)A/\u00192mK*\u0011QBD\u0001\u0006M2Lgn\u001b\u0006\u0003\u001fA\ta!\u00199bG\",'\"A\t\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0005\u0001!\u0002CA\u000b\u0019\u001b\u00051\"BA\f\t\u0003\u0015)H/\u001b7t\u0013\tIbCA\u0007UC\ndW\rV3ti\n\u000b7/\u001a\u0005\u00067\u0001!\t\u0001H\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003u\u0001\"A\b\u0001\u000e\u0003\tAq\u0001\t\u0001C\u0002\u0013%\u0011%\u0001\u0003vi&dW#\u0001\u0012\u0011\u0005U\u0019\u0013B\u0001\u0013\u0017\u0005M\u0019FO]3b[R\u000b'\r\\3UKN$X\u000b^5m\u0011\u00191\u0003\u0001)A\u0005E\u0005)Q\u000f^5mA!9\u0001\u0006\u0001b\u0001\n\u0003I\u0013AB*U%&su)F\u0001+!\tY\u0003'D\u0001-\u0015\tic&A\u0004m_\u001eL7-\u00197\u000b\u0005=R\u0011!\u0002;za\u0016\u001c\u0018BA\u0019-\u0005-1\u0016M]\"iCJ$\u0016\u0010]3\t\rM\u0002\u0001\u0015!\u0003+\u0003\u001d\u0019FKU%O\u000f\u0002Bq!\u000e\u0001C\u0002\u0013\u0005a'\u0001\u0003M\u001f:;U#A\u001c\u0011\u0005-B\u0014BA\u001d-\u0005)\u0011\u0015nZ%oiRK\b/\u001a\u0005\u0007w\u0001\u0001\u000b\u0011B\u001c\u0002\u000b1{ej\u0012\u0011\t\u000fu\u0002!\u0019!C\u0001}\u0005\u0019\u0011J\u0014+\u0016\u0003}\u0002\"a\u000b!\n\u0005\u0005c#aB%oiRK\b/\u001a\u0005\u0007\u0007\u0002\u0001\u000b\u0011B \u0002\t%sE\u000b\t\u0005\u0006\u000b\u0002!\tAR\u0001\u0006g\u0016$X\u000f\u001d\u000b\u0002\u000fB\u0011\u0001jS\u0007\u0002\u0013*\t!*A\u0003tG\u0006d\u0017-\u0003\u0002M\u0013\n!QK\\5uQ\t!e\n\u0005\u0002P%6\t\u0001K\u0003\u0002R!\u0005)!.\u001e8ji&\u00111\u000b\u0015\u0002\u0007\u0005\u00164wN]3\t\u000bU\u0003A\u0011\u0001$\u0002#Q,7\u000f^'j]&\u0014\u0015\r^2i\u001f:d\u0017\u0010\u000b\u0002U/B\u0011q\nW\u0005\u00033B\u0013A\u0001V3ti\")1\f\u0001C\u0001\r\u0006\u0001C/Z:u%\u0016$WO\u001c3b]R<\u0016\r^3s[\u0006\u00148\u000eR3gS:LG/[8oQ\tQv\u000bC\u0003_\u0001\u0011\u0005a)A\fuKN$x+\u001b8e_^<\u0016\u000e\u001e5FCJd\u0017PR5sK\"\u0012Ql\u0016\u0005\u0006C\u0002!\tAR\u0001\u0012i\u0016\u001cHoV5oI><8)Y:dC\u0012,\u0007F\u00011X\u0011\u0015!\u0007\u0001\"\u0001G\u0003m!Xm\u001d;XS:$wn\u001e&pS:<\u0016\u000e\u001e5NS:L')\u0019;dQ\"\u00121m\u0016\u0005\u0006O\u0002!\tAR\u0001!i\u0016\u001cHOU8xi&lWMU8xg>3XM],ji\"l\u0015N\\5CCR\u001c\u0007\u000e\u000b\u0002g/\")!\u000e\u0001C\u0001\r\u0006QC/Z:u)\u0016l\u0007o\u001c:bYR\u000b'\r\\3Gk:\u001cG/[8o\u0015>LgnV5uQ6Kg.\u001b\"bi\u000eD\u0007FA5X\u0011\u0015i\u0007\u0001\"\u0001G\u0003\u0001\"Xm\u001d;Nk2$\u0018n\u00149fe\u0006$xN\u001d(fK\u0012\u001cx+\u0019;fe6\f'o[\u0019)\u00051<\u0006\"\u00029\u0001\t\u00031\u0015\u0001\t;fgRlU\u000f\u001c;j\u001fB,'/\u0019;pe:+W\rZ:XCR,'/\\1sWJB#a\\,\t\u000bM\u0004A\u0011\u0001$\u0002AQ,7\u000f^'vYRLw\n]3sCR|'OT3fIN<\u0016\r^3s[\u0006\u00148n\r\u0015\u0003e^CQA\u001e\u0001\u0005\u0002\u0019\u000bA\u0004^3ti6+H\u000e^5qY\u0016<\u0016N\u001c3po\u0006;wM]3hCR,7\u000f\u000b\u0002v/\")\u0011\u0010\u0001C\u0001\r\u0006!C/Z:u\u001b&t\u0017NQ1uG\"|e\u000eR1uCN#(/Z1n/&$\bNU8x)&lW\r\u000b\u0002y/\")A\u0010\u0001C\u0001\r\u0006qC/Z:u\u001fZ,'oV5oI><X*\u001b8j\u0005\u0006$8\r[(o\t\u0006$\u0018m\u0015;sK\u0006lw+\u001b;i%><H+[7fQ\tYx\u000b\u0003\u0004��\u0001\u0011%\u0011\u0011A\u0001\u0013o&$\b.R1sYf4\u0015N]3EK2\f\u0017\u0010F\u0003H\u0003\u0007\t\u0019\u0002C\u0004\u0002\u0006y\u0004\r!a\u0002\u0002\u0017Q\f'\r\\3D_:4\u0017n\u001a\t\u0005\u0003\u0013\ty!\u0004\u0002\u0002\f)\u0019\u0011Q\u0002\u0006\u0002\u0007\u0005\u0004\u0018.\u0003\u0003\u0002\u0012\u0005-!a\u0003+bE2,7i\u001c8gS\u001eDq!!\u0006\u007f\u0001\u0004\t9\"\u0001\u0005j]R,'O^1m!\u0011\tI\"!\n\u000e\u0005\u0005m!\u0002BA\u000f\u0003?\tA\u0001^5nK*!\u0011\u0011EA\u0012\u0003\u0019\u0019w.\\7p]*\u0019\u0011Q\u0002\u0007\n\t\u0005\u001d\u00121\u0004\u0002\u0005)&lW\r")
/* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.class */
public class MiniBatchIntervalInferTest extends TableTestBase {
    private final StreamTableTestUtil util = streamTestUtil(streamTestUtil$default$1());
    private final VarCharType STRING = new VarCharType(Integer.MAX_VALUE);
    private final BigIntType LONG = new BigIntType();
    private final IntType INT = new IntType();
    private static Symbol symbol$1 = Symbol$.MODULE$.apply("a");
    private static Symbol symbol$2 = Symbol$.MODULE$.apply("b");
    private static Symbol symbol$3 = Symbol$.MODULE$.apply("c");
    private static Symbol symbol$4 = Symbol$.MODULE$.apply("proctime");
    private static Symbol symbol$5 = Symbol$.MODULE$.apply("rowtime");
    private static Symbol symbol$6 = Symbol$.MODULE$.apply("id1");
    private static Symbol symbol$7 = Symbol$.MODULE$.apply("text");
    private static Symbol symbol$8 = Symbol$.MODULE$.apply("id2");
    private static Symbol symbol$9 = Symbol$.MODULE$.apply("cnt");
    private static Symbol symbol$10 = Symbol$.MODULE$.apply("name");
    private static Symbol symbol$11 = Symbol$.MODULE$.apply("goods");
    private static Symbol symbol$12 = Symbol$.MODULE$.apply("long");
    private static Symbol symbol$13 = Symbol$.MODULE$.apply("int");
    private static Symbol symbol$14 = Symbol$.MODULE$.apply("str");

    private StreamTableTestUtil util() {
        return this.util;
    }

    public VarCharType STRING() {
        return this.STRING;
    }

    public BigIntType LONG() {
        return this.LONG;
    }

    public IntType INT() {
        return this.INT;
    }

    @Before
    public void setup() {
        util().addDataStream("MyTable1", Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$1), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3), package$.MODULE$.UnresolvedFieldExpression(symbol$4).proctime(), package$.MODULE$.UnresolvedFieldExpression(symbol$5).rowtime()}), new MiniBatchIntervalInferTest$$anon$7(this));
        util().addDataStream("MyTable2", Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$1), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3), package$.MODULE$.UnresolvedFieldExpression(symbol$4).proctime(), package$.MODULE$.UnresolvedFieldExpression(symbol$5).rowtime()}), new MiniBatchIntervalInferTest$$anon$8(this));
        util().tableEnv().getConfig().getConfiguration().setBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true);
    }

    @Test
    public void testMiniBatchOnly() {
        util().tableEnv().getConfig().getConfiguration().setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s");
        util().verifyPlan("SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable1 GROUP BY b");
    }

    @Test
    public void testRedundantWatermarkDefinition() {
        util().tableEnv().getConfig().getConfiguration().setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s");
        util().addTableWithWatermark("MyTable3", util().tableEnv().scan(new String[]{"MyTable1"}), "rowtime", 0L);
        util().verifyPlan("SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable3 GROUP BY b");
    }

    @Test
    public void testWindowWithEarlyFire() {
        TableConfig config = util().tableEnv().getConfig();
        config.getConfiguration().setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s");
        withEarlyFireDelay(config, Time.milliseconds(500L));
        util().addTableWithWatermark("MyTable3", util().tableEnv().scan(new String[]{"MyTable1"}), "rowtime", 0L);
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT b, SUM(cnt)\n        | FROM (\n        |   SELECT b,\n        |     COUNT(a) as cnt,\n        |     HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_start,\n        |     HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_end\n        |   FROM MyTable3\n        |   GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)\n        | )\n        | GROUP BY b\n      ")).stripMargin());
    }

    @Test
    public void testWindowCascade() {
        util().tableEnv().getConfig().getConfiguration().setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "3 s");
        util().addTableWithWatermark("MyTable3", util().tableEnv().scan(new String[]{"MyTable1"}), "rowtime", 0L);
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT b,\n        |   SUM(cnt)\n        | FROM (\n        |   SELECT b,\n        |     COUNT(a) as cnt,\n        |     TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) as rt\n        |   FROM MyTable3\n        |   GROUP BY b, TUMBLE(rowtime, INTERVAL '10' SECOND)\n        | )\n        | GROUP BY b, TUMBLE(rt, INTERVAL '5' SECOND)\n      ")).stripMargin());
    }

    @Test
    public void testWindowJoinWithMiniBatch() {
        util().addTableWithWatermark("LeftT", util().tableEnv().scan(new String[]{"MyTable1"}), "rowtime", 0L);
        util().addTableWithWatermark("RightT", util().tableEnv().scan(new String[]{"MyTable2"}), "rowtime", 0L);
        util().tableEnv().getConfig().getConfiguration().setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s");
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT b, COUNT(a)\n        | FROM (\n        |   SELECT t1.a as a, t1.b as b\n        |   FROM\n        |     LeftT as t1 JOIN RightT as t2\n        |   ON\n        |     t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND\n        |     t2.rowtime + INTERVAL '10' SECOND\n        | )\n        | GROUP BY b\n      ")).stripMargin());
    }

    @Test
    public void testRowtimeRowsOverWithMiniBatch() {
        util().addTableWithWatermark("MyTable3", util().tableEnv().scan(new String[]{"MyTable1"}), "rowtime", 0L);
        util().tableEnv().getConfig().getConfiguration().setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s");
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT cnt, COUNT(c)\n        | FROM (\n        |   SELECT c, COUNT(a)\n        |   OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND CURRENT ROW) as cnt\n        |   FROM MyTable3\n        | )\n        | GROUP BY cnt\n      ")).stripMargin());
    }

    @Test
    public void testTemporalTableFunctionJoinWithMiniBatch() {
        util().addTableWithWatermark("Orders", util().tableEnv().scan(new String[]{"MyTable1"}), "rowtime", 0L);
        util().addTableWithWatermark("RatesHistory", util().tableEnv().scan(new String[]{"MyTable2"}), "rowtime", 0L);
        util().tableEnv().getConfig().getConfiguration().setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s");
        util().addFunction("Rates", util().tableEnv().scan(new String[]{"RatesHistory"}).createTemporalTableFunction("rowtime", "b"), TypeExtractor.createTypeInfo(Row.class));
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT r_a, COUNT(o_a)\n        | FROM (\n        |   SELECT o.a as o_a, r.a as r_a\n        |   FROM Orders As o,\n        |   LATERAL TABLE (Rates(o.rowtime)) as r\n        |   WHERE o.b = r.b\n        | )\n        | GROUP BY r_a\n      ")).stripMargin());
    }

    @Test
    public void testMultiOperatorNeedsWatermark1() {
        util().addTableWithWatermark("LeftT", util().tableEnv().scan(new String[]{"MyTable1"}), "rowtime", 0L);
        util().addTableWithWatermark("RightT", util().tableEnv().scan(new String[]{"MyTable2"}), "rowtime", 0L);
        util().tableEnv().getConfig().getConfiguration().setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s");
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT\n        |   b, COUNT(a),\n        |   TUMBLE_START(rt, INTERVAL '5' SECOND),\n        |   TUMBLE_END(rt, INTERVAL '5' SECOND)\n        | FROM (\n        |   SELECT t1.a as a, t1.b as b, t1.rowtime as rt\n        |   FROM\n        |     LeftT as t1 JOIN RightT as t2\n        |   ON\n        |     t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND\n        |     t2.rowtime + INTERVAL '10' SECOND\n        | )\n        | GROUP BY b,TUMBLE(rt, INTERVAL '5' SECOND)\n      ")).stripMargin());
    }

    @Test
    public void testMultiOperatorNeedsWatermark2() {
        util().addTableWithWatermark("LeftT", util().tableEnv().scan(new String[]{"MyTable1"}), "rowtime", 0L);
        util().addTableWithWatermark("RightT", util().tableEnv().scan(new String[]{"MyTable2"}), "rowtime", 0L);
        util().tableEnv().getConfig().getConfiguration().setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "6 s");
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT b, COUNT(a)\n        | OVER (PARTITION BY b ORDER BY rt ROWS BETWEEN 5 preceding AND CURRENT ROW)\n        | FROM (\n        |  SELECT t1.a as a, t1.b as b, t1.rt as rt\n        |  FROM\n        |  (\n        |    SELECT b,\n        |     COUNT(a) as a,\n        |     TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) as rt\n        |    FROM LeftT\n        |    GROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND)\n        |  ) as t1\n        |  JOIN\n        |  (\n        |    SELECT b,\n        |     COUNT(a) as a,\n        |     HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt\n        |    FROM RightT\n        |    GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)\n        |  ) as t2\n        |  ON\n        |    t1.a = t2.a AND t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND\n        |    t2.rt + INTERVAL '10' SECOND\n        | )\n      ")).stripMargin());
    }

    @Test
    public void testMultiOperatorNeedsWatermark3() {
        util().addTableWithWatermark("RightT", util().tableEnv().scan(new String[]{"MyTable2"}), "rowtime", 0L);
        util().tableEnv().getConfig().getConfiguration().setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "6 s");
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString("\n        |  SELECT t1.a, t1.b\n        |  FROM (\n        |    SELECT a, COUNT(b) as b FROM MyTable1 GROUP BY a\n        |  ) as t1\n        |  JOIN (\n        |    SELECT b, COUNT(a) as a\n        |    FROM (\n        |      SELECT b, COUNT(a) as a,\n        |         HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt\n        |      FROM RightT\n        |      GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)\n        |    )\n        |    GROUP BY b\n        |  ) as t2\n        |  ON t1.a = t2.a\n      ")).stripMargin());
    }

    @Test
    public void testMultipleWindowAggregates() {
        util().addDataStream("T1", Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$6), package$.MODULE$.UnresolvedFieldExpression(symbol$5).rowtime(), package$.MODULE$.symbol2FieldExpression(symbol$7)}), new MiniBatchIntervalInferTest$$anon$9(this));
        util().addDataStream("T2", Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$8), package$.MODULE$.UnresolvedFieldExpression(symbol$5).rowtime(), package$.MODULE$.symbol2FieldExpression(symbol$9), package$.MODULE$.symbol2FieldExpression(symbol$10), package$.MODULE$.symbol2FieldExpression(symbol$11)}), new MiniBatchIntervalInferTest$$anon$10(this));
        util().addTableWithWatermark("T3", util().tableEnv().scan(new String[]{"T1"}), "rowtime", 0L);
        util().addTableWithWatermark("T4", util().tableEnv().scan(new String[]{"T2"}), "rowtime", 0L);
        util().tableEnv().getConfig().getConfiguration().setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "500 ms");
        util().tableEnv().getConfig().getConfiguration().setLong(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 300L);
        util().tableEnv().registerTable("TempTable1", util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT id1, T3.rowtime AS ts, text\n        |  FROM T3, T4\n        |WHERE id1 = id2\n        |      AND T3.rowtime > T4.rowtime - INTERVAL '5' MINUTE\n        |      AND T3.rowtime < T4.rowtime + INTERVAL '3' MINUTE\n      ")).stripMargin()));
        util().tableEnv().registerTable("TempTable2", util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT id1,\n        |    LISTAGG(text, '#') as text,\n        |    TUMBLE_ROWTIME(ts, INTERVAL '6' SECOND) as ts\n        |FROM TempTable1\n        |GROUP BY TUMBLE(ts, INTERVAL '6' SECOND), id1\n      ")).stripMargin()));
        util().writeToSink(util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT id1,\n        |    LISTAGG(text, '*')\n        |FROM TempTable2\n        |GROUP BY HOP(ts, INTERVAL '12' SECOND, INTERVAL '4' SECOND), id1\n      ")).stripMargin()), util().createAppendTableSink(new String[]{"a", "b"}, new LogicalType[]{INT(), STRING()}), "appendSink1");
        util().writeToSink(util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT id1,\n        |    LISTAGG(text, '-')\n        |FROM TempTable1\n        |GROUP BY TUMBLE(ts, INTERVAL '9' SECOND), id1\n      ")).stripMargin()), util().createAppendTableSink(new String[]{"a", "b"}, new LogicalType[]{INT(), STRING()}), "appendSink2");
        util().writeToSink(util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT id1,\n        |    COUNT(text)\n        |FROM TempTable2\n        |GROUP BY id1\n      ")).stripMargin()), util().createRetractTableSink(new String[]{"a", "b"}, new LogicalType[]{INT(), LONG()}), "appendSink3");
        util().verifyExplain();
    }

    @Test
    public void testMiniBatchOnDataStreamWithRowTime() {
        util().addDataStream("T1", Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$12), package$.MODULE$.symbol2FieldExpression(symbol$13), package$.MODULE$.symbol2FieldExpression(symbol$14), package$.MODULE$.UnresolvedFieldExpression(symbol$5).rowtime()}), new MiniBatchIntervalInferTest$$anon$11(this));
        util().tableEnv().getConfig().getConfiguration().setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s");
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT long,\n        |  COUNT(str) as cnt,\n        |  TUMBLE_END(rowtime, INTERVAL '10' SECOND) as rt\n        |FROM T1\n        |GROUP BY long, TUMBLE(rowtime, INTERVAL '10' SECOND)\n      ")).stripMargin());
    }

    @Test
    public void testOverWindowMiniBatchOnDataStreamWithRowTime() {
        util().addDataStream("T1", Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$12), package$.MODULE$.symbol2FieldExpression(symbol$13), package$.MODULE$.symbol2FieldExpression(symbol$14), package$.MODULE$.UnresolvedFieldExpression(symbol$5).rowtime()}), new MiniBatchIntervalInferTest$$anon$12(this));
        util().tableEnv().getConfig().getConfiguration().setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "3 s");
        util().verifyPlan(new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT cnt, COUNT(`int`)\n        | FROM (\n        |   SELECT `int`,\n        |    COUNT(str) OVER\n        |      (PARTITION BY long ORDER BY rowtime ROWS BETWEEN 5 preceding AND CURRENT ROW) as cnt\n        |   FROM T1\n        | )\n        | GROUP BY cnt\n      ")).stripMargin());
    }

    private void withEarlyFireDelay(TableConfig tableConfig, Time time) {
        long milliseconds = time.toMilliseconds();
        Long millisecondFromConfigDuration = TableConfigUtils.getMillisecondFromConfigDuration(tableConfig, WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_EARLY_FIRE_DELAY());
        if (millisecondFromConfigDuration != null && !BoxesRunTime.equalsNumObject(millisecondFromConfigDuration, BoxesRunTime.boxToLong(milliseconds))) {
            throw new RuntimeException("Currently not support different earlyFireInterval configs in one job");
        }
        tableConfig.getConfiguration().setBoolean(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED(), true);
        tableConfig.getConfiguration().setString(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_EARLY_FIRE_DELAY(), new StringBuilder().append(milliseconds).append(" ms").toString());
    }
}
