package org.apache.flink.table.planner.plan.batch.sql;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.scala.package$;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder$;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
import org.apache.flink.table.planner.utils.BatchTableTestUtil;
import org.apache.flink.table.planner.utils.TableFunc1;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.Test;
import scala.Predef$;
import scala.Symbol;
import scala.Symbol$;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;

/* compiled from: DagOptimizationTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005=b\u0001B\u0001\u0003\u0001M\u00111\u0003R1h\u001fB$\u0018.\\5{CRLwN\u001c+fgRT!a\u0001\u0003\u0002\u0007M\fHN\u0003\u0002\u0006\r\u0005)!-\u0019;dQ*\u0011q\u0001C\u0001\u0005a2\fgN\u0003\u0002\n\u0015\u00059\u0001\u000f\\1o]\u0016\u0014(BA\u0006\r\u0003\u0015!\u0018M\u00197f\u0015\tia\"A\u0003gY&t7N\u0003\u0002\u0010!\u00051\u0011\r]1dQ\u0016T\u0011!E\u0001\u0004_J<7\u0001A\n\u0003\u0001Q\u0001\"!\u0006\r\u000e\u0003YQ!a\u0006\u0005\u0002\u000bU$\u0018\u000e\\:\n\u0005e1\"!\u0004+bE2,G+Z:u\u0005\u0006\u001cX\rC\u0003\u001c\u0001\u0011\u0005A$\u0001\u0004=S:LGO\u0010\u000b\u0002;A\u0011a\u0004A\u0007\u0002\u0005!9\u0001\u0005\u0001b\u0001\n\u0013\t\u0013\u0001B;uS2,\u0012A\t\t\u0003+\rJ!\u0001\n\f\u0003%\t\u000bGo\u00195UC\ndW\rV3tiV#\u0018\u000e\u001c\u0005\u0007M\u0001\u0001\u000b\u0011\u0002\u0012\u0002\u000bU$\u0018\u000e\u001c\u0011\t\u000f!\u0002!\u0019!C\u0001S\u000511\u000b\u0016*J\u001d\u001e+\u0012A\u000b\t\u0003WAj\u0011\u0001\f\u0006\u0003[9\nq\u0001\\8hS\u000e\fGN\u0003\u00020\u0015\u0005)A/\u001f9fg&\u0011\u0011\u0007\f\u0002\f-\u0006\u00148\t[1s)f\u0004X\r\u0003\u00044\u0001\u0001\u0006IAK\u0001\b'R\u0013\u0016JT$!\u0011\u001d)\u0004A1A\u0005\u0002Y\nA\u0001T(O\u000fV\tq\u0007\u0005\u0002,q%\u0011\u0011\b\f\u0002\u000b\u0005&<\u0017J\u001c;UsB,\u0007BB\u001e\u0001A\u0003%q'A\u0003M\u001f:;\u0005\u0005C\u0004>\u0001\t\u0007I\u0011\u0001 \u0002\u0007%sE+F\u0001@!\tY\u0003)\u0003\u0002BY\t9\u0011J\u001c;UsB,\u0007BB\"\u0001A\u0003%q(\u0001\u0003J\u001dR\u0003\u0003bB#\u0001\u0005\u0004%\tAR\u0001\u0007\t>+&\tT#\u0016\u0003\u001d\u0003\"a\u000b%\n\u0005%c#A\u0003#pk\ndW\rV=qK\"11\n\u0001Q\u0001\n\u001d\u000bq\u0001R(V\u00052+\u0005\u0005C\u0004N\u0001\t\u0007I\u0011\u0001(\u0002\u0013QKU*R*U\u00036\u0003V#A(\u0011\u0005-\u0002\u0016BA)-\u00055!\u0016.\\3ti\u0006l\u0007\u000fV=qK\"11\u000b\u0001Q\u0001\n=\u000b!\u0002V%N\u000bN#\u0016)\u0014)!\u0011\u0015)\u0006\u0001\"\u0001W\u0003=!Xm\u001d;TS:<G.Z*j].\fD#A,\u0011\u0005a[V\"A-\u000b\u0003i\u000bQa]2bY\u0006L!\u0001X-\u0003\tUs\u0017\u000e\u001e\u0015\u0003)z\u0003\"a\u00182\u000e\u0003\u0001T!!\u0019\t\u0002\u000b),h.\u001b;\n\u0005\r\u0004'\u0001\u0002+fgRDQ!\u001a\u0001\u0005\u0002Y\u000bq\u0002^3tiNKgn\u001a7f'&t7N\r\u0015\u0003IzCQ\u0001\u001b\u0001\u0005\u0002Y\u000bq\u0002^3tiNKgn\u001a7f'&t7n\r\u0015\u0003OzCQa\u001b\u0001\u0005\u0002Y\u000bq\u0002^3tiNKgn\u001a7f'&t7\u000e\u000e\u0015\u0003UzCQA\u001c\u0001\u0005\u0002Y\u000ba\u0003^3tiNKgn\u001a7f'&t7nV5uQV#EK\u0012\u0015\u0003[zCQ!\u001d\u0001\u0005\u0002Y\u000b!\u0004^3tiNKgn\u001a7f'&t7n\u00159mSR|e.\u00168j_:D#\u0001\u001d0\t\u000bQ\u0004A\u0011\u0001,\u0002\u001fQ,7\u000f^'vYRL7+\u001b8lgFB#a\u001d0\t\u000b]\u0004A\u0011\u0001,\u0002\u001fQ,7\u000f^'vYRL7+\u001b8lgJB#A\u001e0\t\u000bi\u0004A\u0011\u0001,\u0002\u001fQ,7\u000f^'vYRL7+\u001b8lgNB#!\u001f0\t\u000bu\u0004A\u0011\u0001,\u0002\u001fQ,7\u000f^'vYRL7+\u001b8lgRB#\u0001 0\t\r\u0005\u0005\u0001\u0001\"\u0001W\u0003=!Xm\u001d;Nk2$\u0018nU5oWN,\u0004FA@_\u0011\u0019\t9\u0001\u0001C\u0001-\u0006\u0019B/Z:u\u001bVdG/\u001b'fm\u0016dg+[3xg\"\u001a\u0011Q\u00010\t\r\u00055\u0001\u0001\"\u0001W\u0003Y!Xm\u001d;Nk2$\u0018nU5oWN<\u0016\u000e\u001e5V\tR3\u0005fAA\u0006=\"1\u00111\u0003\u0001\u0005\u0002Y\u000b1\u0004^3ti6+H\u000e^5TS:\\7o\u00159mSR|e.\u00168j_:\f\u0004fAA\t=\"1\u0011\u0011\u0004\u0001\u0005\u0002Y\u000b1\u0004^3ti6+H\u000e^5TS:\\7o\u00159mSR|e.\u00168j_:\u0014\u0004fAA\f=\"1\u0011q\u0004\u0001\u0005\u0002Y\u000b1\u0004^3ti6+H\u000e^5TS:\\7o\u00159mSR|e.\u00168j_:\u001c\u0004fAA\u000f=\"1\u0011Q\u0005\u0001\u0005\u0002Y\u000b1\u0004^3ti6+H\u000e^5TS:\\7o\u00159mSR|e.\u00168j_:$\u0004fAA\u0012=\"1\u00111\u0006\u0001\u0005\u0002Y\u000b\u0001\u0004^3ti6+H\u000e^5TS:\\7oV5uQ^Kg\u000eZ8xQ\r\tIC\u0018")
/* loaded from: input_file:org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.class */
public class DagOptimizationTest extends TableTestBase {
    private final BatchTableTestUtil util = batchTestUtil();
    private final VarCharType STRING;
    private final BigIntType LONG;
    private final IntType INT;
    private final DoubleType DOUBLE;
    private final TimestampType TIMESTAMP;
    private static Symbol symbol$1 = Symbol$.MODULE$.apply("a");
    private static Symbol symbol$2 = Symbol$.MODULE$.apply("b");
    private static Symbol symbol$3 = Symbol$.MODULE$.apply("c");
    private static Symbol symbol$4 = Symbol$.MODULE$.apply("d");
    private static Symbol symbol$5 = Symbol$.MODULE$.apply("e");
    private static Symbol symbol$6 = Symbol$.MODULE$.apply("i");
    private static Symbol symbol$7 = Symbol$.MODULE$.apply("j");
    private static Symbol symbol$8 = Symbol$.MODULE$.apply("k");
    private static Symbol symbol$9 = Symbol$.MODULE$.apply("l");
    private static Symbol symbol$10 = Symbol$.MODULE$.apply("m");
    private static Symbol symbol$11 = Symbol$.MODULE$.apply("ts");
    private static Symbol symbol$12 = Symbol$.MODULE$.apply("f");

    private BatchTableTestUtil util() {
        return this.util;
    }

    public VarCharType STRING() {
        return this.STRING;
    }

    public BigIntType LONG() {
        return this.LONG;
    }

    public IntType INT() {
        return this.INT;
    }

    public DoubleType DOUBLE() {
        return this.DOUBLE;
    }

    public TimestampType TIMESTAMP() {
        return this.TIMESTAMP;
    }

    @Test
    public void testSingleSink1() {
        util().writeToSink(util().tableEnv().sqlQuery("SELECT c, COUNT(a) AS cnt FROM MyTable GROUP BY c"), util().createCollectTableSink(new String[]{"c", "cnt"}, new LogicalType[]{STRING(), LONG()}), "appendSink");
        util().verifyPlan();
    }

    @Test
    public void testSingleSink2() {
        util().tableEnv().registerTable("table1", util().tableEnv().sqlQuery("SELECT a as a1, b FROM MyTable WHERE a <= 10"));
        util().tableEnv().registerTable("table2", util().tableEnv().sqlQuery("SELECT a, b, c FROM MyTable WHERE a >= 0"));
        util().tableEnv().registerTable("table3", util().tableEnv().sqlQuery("SELECT a AS a2, c FROM table2 WHERE b >= 5"));
        util().tableEnv().registerTable("table4", util().tableEnv().sqlQuery("SELECT a AS a3, c as c1 FROM table2 WHERE b < 5"));
        util().tableEnv().registerTable("table5", util().tableEnv().sqlQuery("SELECT a1, b, c as c2 FROM table1, table3 WHERE a1 = a2"));
        util().writeToSink(util().tableEnv().sqlQuery("SELECT a1, b, c1 FROM table4, table5 WHERE a1 = a3"), util().createCollectTableSink(new String[]{"a1", "b", "c1"}, new LogicalType[]{INT(), LONG(), STRING()}), "appendSink");
        util().verifyPlan();
    }

    @Test
    public void testSingleSink3() {
        util().addDataStream("MyTable2", Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$1), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3), package$.MODULE$.symbol2FieldExpression(symbol$4), package$.MODULE$.symbol2FieldExpression(symbol$5)}), new DagOptimizationTest$$anon$13(this));
        util().tableEnv().registerTable("table1", util().tableEnv().sqlQuery("SELECT a AS a1, b as b1 FROM MyTable WHERE a <= 10"));
        util().tableEnv().registerTable("table2", util().tableEnv().sqlQuery("SELECT a, b1 FROM table1, MyTable2 WHERE a = a1"));
        util().writeToSink(util().tableEnv().sqlQuery("SELECT * FROM table1 UNION ALL SELECT * FROM table2"), util().createCollectTableSink(new String[]{"a1", "b1"}, new LogicalType[]{INT(), LONG()}), "appendSink");
        util().verifyPlan();
    }

    @Test
    public void testSingleSink4() {
        util().tableEnv().registerTable("table1", util().tableEnv().sqlQuery("SELECT a as a1, b FROM MyTable WHERE a <= 10"));
        util().tableEnv().registerTable("table2", util().tableEnv().sqlQuery("SELECT a, b, c FROM MyTable WHERE a >= 0"));
        util().tableEnv().registerTable("table3", util().tableEnv().sqlQuery("SELECT a AS a2, c FROM table2 WHERE b >= 5"));
        util().tableEnv().registerTable("table4", util().tableEnv().sqlQuery("SELECT a AS a3, c AS c1 FROM table2 WHERE b < 5"));
        util().tableEnv().registerTable("table5", util().tableEnv().sqlQuery("SELECT a1, b, c AS c2 from table1, table3 WHERE a1 = a2"));
        util().tableEnv().registerTable("table6", util().tableEnv().sqlQuery("SELECT a3, b as b1, c1 FROM table4, table5 WHERE a1 = a3"));
        util().writeToSink(util().tableEnv().sqlQuery("SELECT a1, b1, c1 FROM table1, table6 WHERE a1 = a3"), util().createCollectTableSink(new String[]{"a", "b", "c"}, new LogicalType[]{INT(), LONG(), STRING()}), "sink");
        util().verifyPlan();
    }

    @Test
    public void testSingleSinkWithUDTF() {
        util().addTableSource("MyTable2", (Seq<Expression>) Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$6), package$.MODULE$.symbol2FieldExpression(symbol$7), package$.MODULE$.symbol2FieldExpression(symbol$8), package$.MODULE$.symbol2FieldExpression(symbol$9), package$.MODULE$.symbol2FieldExpression(symbol$10)}), (TypeInformation) new DagOptimizationTest$$anon$14(this));
        util().addFunction("split", new TableFunc1(), BasicTypeInfo.getInfoFor(String.class));
        util().writeToSink(util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |select * from\n        |    (SELECT * FROM MyTable, MyTable1, MyTable2 WHERE b = e AND a = i) t,\n        |    LATERAL TABLE(split(c)) as T(s)\n      ")).stripMargin()), util().createCollectTableSink(new String[]{"a", "b", "c", "d", "e", "f", "i", "j", "k", "l", "m", "s"}, new LogicalType[]{INT(), LONG(), STRING(), INT(), LONG(), STRING(), INT(), LONG(), INT(), STRING(), LONG(), STRING()}), "sink");
        util().verifyPlan();
    }

    @Test
    public void testSingleSinkSplitOnUnion() {
        util().writeToSink(util().tableEnv().sqlQuery("SELECT SUM(a) AS total_sum FROM (SELECT a, c FROM MyTable UNION ALL SELECT d, f FROM MyTable1)"), util().createCollectTableSink(new String[]{"total_sum"}, new LogicalType[]{INT()}), "sink");
        util().verifyPlan();
    }

    @Test
    public void testMultiSinks1() {
        util().tableEnv().getConfig().getConfiguration().setBoolean(RelNodeBlockPlanBuilder$.MODULE$.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED(), true);
        util().tableEnv().registerTable("table1", util().tableEnv().sqlQuery("SELECT SUM(a) AS sum_a, c FROM MyTable GROUP BY c"));
        Table sqlQuery = util().tableEnv().sqlQuery("SELECT SUM(sum_a) AS total_sum FROM table1");
        Table sqlQuery2 = util().tableEnv().sqlQuery("SELECT MIN(sum_a) AS total_min FROM table1");
        util().writeToSink(sqlQuery, util().createCollectTableSink(new String[]{"total_sum"}, new LogicalType[]{INT()}), "sink1");
        util().writeToSink(sqlQuery2, util().createCollectTableSink(new String[]{"total_min"}, new LogicalType[]{INT()}), "sink2");
        util().verifyPlan();
    }

    @Test
    public void testMultiSinks2() {
        util().tableEnv().getConfig().getConfiguration().setBoolean(RelNodeBlockPlanBuilder$.MODULE$.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED(), false);
        util().addTableSource("MyTable2", (Seq<Expression>) Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$1), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3), package$.MODULE$.symbol2FieldExpression(symbol$4), package$.MODULE$.symbol2FieldExpression(symbol$5)}), (TypeInformation) new DagOptimizationTest$$anon$15(this));
        util().tableEnv().registerTable("table1", util().tableEnv().sqlQuery("SELECT a as a1, b as b1 FROM MyTable WHERE a <= 10"));
        util().tableEnv().registerTable("table2", util().tableEnv().sqlQuery("SELECT a, b1 from table1, MyTable2 where a = a1"));
        Table sqlQuery = util().tableEnv().sqlQuery("SELECT * FROM table1 UNION ALL SELECT * FROM table2");
        util().writeToSink(sqlQuery, util().createCollectTableSink(new String[]{"a", "b1"}, new LogicalType[]{INT(), LONG()}), "sink1");
        util().writeToSink(sqlQuery, util().createCollectTableSink(new String[]{"a", "b1"}, new LogicalType[]{INT(), LONG()}), "sink2");
        util().verifyPlan();
    }

    @Test
    public void testMultiSinks3() {
        util().tableEnv().getConfig().getConfiguration().setBoolean(RelNodeBlockPlanBuilder$.MODULE$.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED(), true);
        util().addTableSource("MyTable2", (Seq<Expression>) Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$1), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3), package$.MODULE$.symbol2FieldExpression(symbol$4), package$.MODULE$.symbol2FieldExpression(symbol$5)}), (TypeInformation) new DagOptimizationTest$$anon$16(this));
        util().tableEnv().registerTable("table1", util().tableEnv().sqlQuery("SELECT a AS a1, b AS b1 FROM MyTable WHERE a <= 10"));
        Table sqlQuery = util().tableEnv().sqlQuery("SELECT a, b1 FROM table1, MyTable2 WHERE a = a1");
        util().tableEnv().registerTable("table2", sqlQuery);
        Table sqlQuery2 = util().tableEnv().sqlQuery("SELECT * FROM table1 UNION ALL SELECT * FROM table2");
        util().writeToSink(sqlQuery, util().createCollectTableSink(new String[]{"a", "b1"}, new LogicalType[]{INT(), LONG()}), "sink1");
        util().writeToSink(sqlQuery2, util().createCollectTableSink(new String[]{"a", "b1"}, new LogicalType[]{INT(), LONG()}), "sink2");
        util().verifyPlan();
    }

    @Test
    public void testMultiSinks4() {
        util().tableEnv().getConfig().getConfiguration().setBoolean(RelNodeBlockPlanBuilder$.MODULE$.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED(), true);
        util().tableEnv().registerTable("table1", util().tableEnv().sqlQuery("SELECT a as a1, b FROM MyTable WHERE a <= 10"));
        util().tableEnv().registerTable("table2", util().tableEnv().sqlQuery("SELECT a, b, c FROM MyTable WHERE a >= 0"));
        util().tableEnv().registerTable("table3", util().tableEnv().sqlQuery("SELECT a as a2, c FROM table2 WHERE b >= 5"));
        util().tableEnv().registerTable("table4", util().tableEnv().sqlQuery("SELECT a as a3, c as c1 FROM table2 WHERE b < 5"));
        Table sqlQuery = util().tableEnv().sqlQuery("SELECT a1, b, c as c2 FROM table1, table3 WHERE a1 = a2");
        util().tableEnv().registerTable("table5", sqlQuery);
        Table sqlQuery2 = util().tableEnv().sqlQuery("SELECT a1, b, c1 FROM table4, table5 WHERE a1 = a3");
        util().writeToSink(sqlQuery, util().createCollectTableSink(new String[]{"a1", "b", "c2"}, new LogicalType[]{INT(), LONG(), STRING()}), "sink1");
        util().writeToSink(sqlQuery2, util().createCollectTableSink(new String[]{"a1", "b", "c1"}, new LogicalType[]{INT(), LONG(), STRING()}), "sink2");
        util().verifyPlan();
    }

    @Test
    public void testMultiSinks5() {
        util().tableEnv().getConfig().getConfiguration().setBoolean(RelNodeBlockPlanBuilder$.MODULE$.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED(), true);
        util().tableEnv().registerFunction("random_udf", new JavaUserDefinedScalarFunctions.NonDeterministicUdf());
        util().tableEnv().registerTable("table1", util().tableEnv().sqlQuery("SELECT random_udf(a) AS a, c FROM MyTable"));
        Table sqlQuery = util().tableEnv().sqlQuery("SELECT SUM(a) AS total_sum FROM table1");
        Table sqlQuery2 = util().tableEnv().sqlQuery("SELECT MIN(a) AS total_min FROM table1");
        util().writeToSink(sqlQuery, util().createCollectTableSink(new String[]{"total_sum"}, new LogicalType[]{INT()}), "sink1");
        util().writeToSink(sqlQuery2, util().createCollectTableSink(new String[]{"total_min"}, new LogicalType[]{INT()}), "sink2");
        util().verifyPlan();
    }

    @Test
    public void testMultiLevelViews() {
        util().tableEnv().getConfig().getConfiguration().setBoolean(RelNodeBlockPlanBuilder$.MODULE$.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED(), true);
        Table sqlQuery = util().tableEnv().sqlQuery("SELECT a, b, c FROM MyTable WHERE c LIKE '%hello%'");
        util().tableEnv().registerTable("TempTable1", sqlQuery);
        util().writeToSink(sqlQuery, util().createCollectTableSink(new String[]{"a", "b", "c"}, new LogicalType[]{INT(), LONG(), STRING()}), "sink1");
        util().tableEnv().registerTable("TempTable2", util().tableEnv().sqlQuery("SELECT a, b, c FROM MyTable WHERE c LIKE '%world%'"));
        util().tableEnv().registerTable("TempTable3", util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT b, COUNT(a) AS cnt FROM (\n        | (SELECT * FROM TempTable1)\n        | UNION ALL\n        | (SELECT * FROM TempTable2)\n        |) t\n        |GROUP BY b\n      ")).stripMargin()));
        util().writeToSink(util().tableEnv().sqlQuery("SELECT b, cnt FROM TempTable3 WHERE b < 4"), util().createCollectTableSink(new String[]{"b", "cnt"}, new LogicalType[]{LONG(), LONG()}), "sink2");
        util().writeToSink(util().tableEnv().sqlQuery("SELECT b, cnt FROM TempTable3 WHERE b >=4 AND b < 6"), util().createCollectTableSink(new String[]{"b", "cnt"}, new LogicalType[]{LONG(), LONG()}), "sink3");
        util().verifyPlan();
    }

    @Test
    public void testMultiSinksWithUDTF() {
        util().tableEnv().getConfig().getConfiguration().setBoolean(RelNodeBlockPlanBuilder$.MODULE$.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED(), true);
        util().addFunction("split", new TableFunc1(), BasicTypeInfo.getInfoFor(String.class));
        util().tableEnv().registerTable("table1", util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT  a, b - MOD(b, 300) AS b, c FROM MyTable\n        |WHERE b >= UNIX_TIMESTAMP('${startTime}')\n      ")).stripMargin()));
        util().tableEnv().registerTable("table2", util().tableEnv().sqlQuery("SELECT a, b, c1 AS c FROM table1, LATERAL TABLE(split(c)) AS T(c1) WHERE c <> '' "));
        util().tableEnv().registerTable("table3", util().tableEnv().sqlQuery("SELECT a, b, COUNT(DISTINCT c) AS total_c FROM table2 GROUP BY a, b"));
        util().tableEnv().registerTable("table4", util().tableEnv().sqlQuery("SELECT a, total_c FROM table3 UNION ALL SELECT a, 0 AS total_c FROM table1"));
        util().writeToSink(util().tableEnv().sqlQuery("SELECT * FROM table4 WHERE a > 50"), util().createCollectTableSink(new String[]{"a", "total_c"}, new LogicalType[]{INT(), LONG()}), "sink1");
        util().writeToSink(util().tableEnv().sqlQuery("SELECT * FROM table4 WHERE a < 50"), util().createCollectTableSink(new String[]{"a", "total_c"}, new LogicalType[]{INT(), LONG()}), "sink2");
        util().verifyPlan();
    }

    @Test
    public void testMultiSinksSplitOnUnion1() {
        util().tableEnv().getConfig().getConfiguration().setBoolean(RelNodeBlockPlanBuilder$.MODULE$.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED(), true);
        util().tableEnv().registerTable("TempTable", util().tableEnv().sqlQuery("SELECT a, c FROM MyTable UNION ALL SELECT d, f FROM MyTable1"));
        util().writeToSink(util().tableEnv().sqlQuery("SELECT SUM(a) AS total_sum FROM TempTable"), util().createCollectTableSink(new String[]{"total_sum"}, new LogicalType[]{INT()}), "sink1");
        util().writeToSink(util().tableEnv().sqlQuery("SELECT MIN(a) AS total_min FROM TempTable"), util().createCollectTableSink(new String[]{"total_min"}, new LogicalType[]{INT()}), "sink2");
        util().verifyPlan();
    }

    @Test
    public void testMultiSinksSplitOnUnion2() {
        util().tableEnv().getConfig().getConfiguration().setBoolean(RelNodeBlockPlanBuilder$.MODULE$.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED(), true);
        util().tableEnv().getConfig().getConfiguration().setBoolean(RelNodeBlockPlanBuilder$.MODULE$.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED(), true);
        util().addTableSource("MyTable2", (Seq<Expression>) Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$1), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3)}), (TypeInformation) new DagOptimizationTest$$anon$17(this));
        util().tableEnv().registerTable("TempTable", util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, c FROM MyTable\n        |UNION ALL\n        |SELECT d, f FROM MyTable1\n        |UNION ALL\n        |SELECT a, c FROM MyTable2\n      ")).stripMargin()));
        util().writeToSink(util().tableEnv().sqlQuery("SELECT SUM(a) AS total_sum FROM TempTable"), util().createCollectTableSink(new String[]{"total_sum"}, new LogicalType[]{INT()}), "sink1");
        util().writeToSink(util().tableEnv().sqlQuery("SELECT MIN(a) AS total_min FROM TempTable"), util().createCollectTableSink(new String[]{"total_min"}, new LogicalType[]{INT()}), "sink2");
        util().writeToSink(util().tableEnv().sqlQuery("SELECT a FROM (SELECT a, c FROM MyTable UNION ALL SELECT d, f FROM MyTable1)"), util().createCollectTableSink(new String[]{"a"}, new LogicalType[]{INT()}), "sink3");
        util().verifyPlan();
    }

    @Test
    public void testMultiSinksSplitOnUnion3() {
        util().tableEnv().getConfig().getConfiguration().setBoolean(RelNodeBlockPlanBuilder$.MODULE$.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED(), true);
        util().addTableSource("MyTable2", (Seq<Expression>) Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$1), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3)}), (TypeInformation) new DagOptimizationTest$$anon$18(this));
        Table sqlQuery = util().tableEnv().sqlQuery("SELECT a, c FROM MyTable UNION ALL SELECT d, f FROM MyTable1");
        util().tableEnv().registerTable("TempTable", sqlQuery);
        util().writeToSink(sqlQuery, util().createCollectTableSink(new String[]{"a", "c"}, new LogicalType[]{INT(), STRING()}), "sink1");
        util().tableEnv().registerTable("TempTable1", util().tableEnv().sqlQuery("SELECT a, c FROM TempTable UNION ALL SELECT a, c FROM MyTable2"));
        util().writeToSink(util().tableEnv().sqlQuery("SELECT SUM(a) AS total_sum FROM TempTable1"), util().createCollectTableSink(new String[]{"total_sum"}, new LogicalType[]{INT()}), "sink2");
        util().writeToSink(util().tableEnv().sqlQuery("SELECT MIN(a) AS total_min FROM TempTable1"), util().createCollectTableSink(new String[]{"total_min"}, new LogicalType[]{INT()}), "sink3");
        util().verifyPlan();
    }

    @Test
    public void testMultiSinksSplitOnUnion4() {
        util().tableEnv().getConfig().getConfiguration().setBoolean(RelNodeBlockPlanBuilder$.MODULE$.TABLE_OPTIMIZER_UNIONALL_AS_BREAKPOINT_DISABLED(), true);
        util().addTableSource("MyTable2", (Seq<Expression>) Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$1), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3)}), (TypeInformation) new DagOptimizationTest$$anon$19(this));
        util().tableEnv().registerTable("TempTable", util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, c FROM MyTable\n        |UNION ALL\n        |SELECT d, f FROM MyTable1\n        |UNION ALL\n        |SELECT a, c FROM MyTable2\n      ")).stripMargin()));
        util().writeToSink(util().tableEnv().sqlQuery("SELECT SUM(a) AS total_sum FROM TempTable"), util().createCollectTableSink(new String[]{"total_sum"}, new LogicalType[]{INT()}), "sink1");
        util().writeToSink(util().tableEnv().sqlQuery("SELECT MIN(a) AS total_min FROM TempTable"), util().createCollectTableSink(new String[]{"total_min"}, new LogicalType[]{INT()}), "sink2");
        util().verifyPlan();
    }

    @Test
    public void testMultiSinksWithWindow() {
        util().addTableSource("MyTable2", (Seq<Expression>) Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$1), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3), package$.MODULE$.symbol2FieldExpression(symbol$11)}), (TypeInformation) new DagOptimizationTest$$anon$20(this));
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |    a,\n        |    SUM (CAST (c AS DOUBLE)) AS sum_c,\n        |    TUMBLE_END(ts, INTERVAL '15' SECOND) AS `time`,\n        |    TUMBLE_START(ts, INTERVAL '15' SECOND) AS window_start,\n        |    TUMBLE_END (ts, INTERVAL '15' SECOND) AS window_end\n        |FROM\n        |    MyTable2\n        |GROUP BY\n        |    TUMBLE (ts, INTERVAL '15' SECOND), a\n      ")).stripMargin();
        String stripMargin2 = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |    a,\n        |    SUM (CAST (c AS DOUBLE)) AS sum_c,\n        |    TUMBLE_END(ts, INTERVAL '15' SECOND) AS `time`\n        |FROM\n        |    MyTable2\n        |GROUP BY\n        |    TUMBLE (ts, INTERVAL '15' SECOND), a\n      ")).stripMargin();
        util().writeToSink(util().tableEnv().sqlQuery(stripMargin), util().createCollectTableSink(new String[]{"a", "sum_c", "time", "window_start", "window_end"}, new LogicalType[]{INT(), DOUBLE(), TIMESTAMP(), TIMESTAMP(), TIMESTAMP()}), "sink1");
        util().writeToSink(util().tableEnv().sqlQuery(stripMargin2), util().createCollectTableSink(new String[]{"a", "sum_c", "time"}, new LogicalType[]{INT(), DOUBLE(), TIMESTAMP()}), "sink2");
        util().verifyPlan();
    }

    public DagOptimizationTest() {
        util().addTableSource("MyTable", (Seq<Expression>) Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$1), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3)}), (TypeInformation) new DagOptimizationTest$$anon$11(this));
        util().addTableSource("MyTable1", (Seq<Expression>) Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$4), package$.MODULE$.symbol2FieldExpression(symbol$5), package$.MODULE$.symbol2FieldExpression(symbol$12)}), (TypeInformation) new DagOptimizationTest$$anon$12(this));
        this.STRING = new VarCharType(Integer.MAX_VALUE);
        this.LONG = new BigIntType();
        this.INT = new IntType();
        this.DOUBLE = new DoubleType();
        this.TIMESTAMP = new TimestampType(3);
    }
}
