package org.apache.flink.table.planner.runtime.stream.table;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.types.Row;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.Test;
import scala.MatchError;
import scala.Predef$;
import scala.StringContext;
import scala.Symbol;
import scala.Symbol$;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.math.Ordering$;
import scala.math.Ordering$Int$;
import scala.math.Ordering$String$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try$;

/* compiled from: TableSinkITCase.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005ma\u0001B\u0001\u0003\u0001I\u0011q\u0002V1cY\u0016\u001c\u0016N\\6J)\u000e\u000b7/\u001a\u0006\u0003\u0007\u0011\tQ\u0001^1cY\u0016T!!\u0002\u0004\u0002\rM$(/Z1n\u0015\t9\u0001\"A\u0004sk:$\u0018.\\3\u000b\u0005%Q\u0011a\u00029mC:tWM\u001d\u0006\u0003\u0007-Q!\u0001D\u0007\u0002\u000b\u0019d\u0017N\\6\u000b\u00059y\u0011AB1qC\u000eDWMC\u0001\u0011\u0003\ry'oZ\u0002\u0001'\t\u00011\u0003\u0005\u0002\u0015/5\tQC\u0003\u0002\u0017\r\u0005)Q\u000f^5mg&\u0011\u0001$\u0006\u0002\u0012'R\u0014X-Y7j]\u001e$Vm\u001d;CCN,\u0007\"\u0002\u000e\u0001\t\u0003Y\u0012A\u0002\u001fj]&$h\bF\u0001\u001d!\ti\u0002!D\u0001\u0003\u0011\u0015y\u0002\u0001\"\u0001!\u0003m!Xm\u001d;BaB,g\u000eZ*j].|e.\u00119qK:$G+\u00192mKR\t\u0011\u0005\u0005\u0002#K5\t1EC\u0001%\u0003\u0015\u00198-\u00197b\u0013\t13E\u0001\u0003V]&$\bF\u0001\u0010)!\tIC&D\u0001+\u0015\tYs\"A\u0003kk:LG/\u0003\u0002.U\t!A+Z:u\u0011\u0015y\u0003\u0001\"\u0001!\u0003m!Xm\u001d;BaB,g\u000eZ*j].<\u0016\u000e\u001e5OKN$X\r\u001a*po\"\u0012a\u0006\u000b\u0005\u0006e\u0001!\t\u0001I\u0001(i\u0016\u001cH/\u00119qK:$7+\u001b8l\u001f:\f\u0005\u000f]3oIR\u000b'\r\\3G_JLeN\\3s\u0015>Lg\u000e\u000b\u00022Q!)Q\u0007\u0001C\u0001A\u0005qB/Z:u%\u0016$(/Y2u'&t7n\u00148Va\u0012\fG/\u001b8h)\u0006\u0014G.\u001a\u0015\u0003i!BQ\u0001\u000f\u0001\u0005\u0002\u0001\nA\u0004^3tiJ+GO]1diNKgn[(o\u0003B\u0004XM\u001c3UC\ndW\r\u000b\u00028Q!)1\b\u0001C\u0001A\u0005\tC/Z:u+B\u001cXM\u001d;TS:\\wJ\u001c(fgR,G-Q4he\u0016<\u0017\r^5p]\"\u0012!\b\u000b\u0005\u0006}\u0001!\t\u0001I\u0001\u001fi\u0016\u001cH/\u00169tKJ$8+\u001b8l\u001f:\f\u0005\u000f]3oI&tw\rV1cY\u0016D#!\u0010\u0015\t\u000b\u0005\u0003A\u0011\u0001\u0011\u0002[Q,7\u000f^+qg\u0016\u0014HoU5oW>s\u0017\t\u001d9f]\u0012Lgn\u001a+bE2,w+\u001b;i_V$h)\u001e7m\u0017\u0016L\u0018\u0007\u000b\u0002AQ!)A\t\u0001C\u0001A\u0005iC/Z:u+B\u001cXM\u001d;TS:\\wJ\\!qa\u0016tG-\u001b8h)\u0006\u0014G.Z,ji\"|W\u000f\u001e$vY2\\U-\u001f\u001a)\u0005\rC\u0003\"B$\u0001\t\u0003\u0001\u0013\u0001\u0007;fgR,\u0006o]3siNKgn[,ji\"4\u0015\u000e\u001c;fe\"\u0012a\t\u000b\u0005\u0006\u0015\u0002!\t\u0001I\u0001\u0011i\u0016\u001cH/T;mi&\u0014vn\u001e;j[\u0016D#!\u0013\u0015\t\u000b5\u0003A\u0011\u0001\u0011\u0002EQ,7\u000f\u001e#fG&l\u0017\r\\(o'&t7NR;oGRLwN\u001c+bE2,7+\u001b8lQ\ta\u0005\u0006C\u0003Q\u0001\u0011\u0005\u0001%\u0001\u0012uKN$H)Z2j[\u0006dwJ\\(viB,HOR8s[\u0006$H+\u00192mKNKgn\u001b\u0015\u0003\u001f\"BQa\u0015\u0001\u0005\u0002\u0001\n1\u0005^3ti\u000eC\u0017M\\4fY><7k\\;sG\u0016\fe\u000eZ\"iC:<W\r\\8h'&t7\u000e\u000b\u0002SQ!)a\u000b\u0001C\u0001A\u0005\u0019B/Z:u\u001d>$h*\u001e7m\u000b:4wN]2fe\"\u0012Q\u000b\u000b\u0005\u00063\u0002!\t\u0001I\u0001\u0010i\u0016\u001cHoU5oW\u000e{g\u000e^3yi\"\u0012\u0001\f\u000b\u0005\u00069\u0002!\t\u0001I\u0001\u001ai\u0016\u001cH/T3uC\u0012\fG/Y*pkJ\u001cW-\u00118e'&t7\u000e\u000b\u0002\\Q!)q\f\u0001C\u0001A\u0005iB/Z:u!\u0006\u0014\u0018\r\u001c7fY&\u001cXnV5uQ\u0012\u000bG/Y*ue\u0016\fW\u000e\u000b\u0002_Q!)!\r\u0001C\u0001A\u0005yB/Z:u!\u0006\u0014\u0018\r\u001c7fY&\u001cXnV5uQNKgn\u001b$v]\u000e$\u0018n\u001c8)\u0005\u0005D\u0003\"B3\u0001\t\u0003\u0001\u0013a\b;fgR\u0004\u0016M]1mY\u0016d\u0017n]7XSRDw*\u001e;qkR4uN]7bi\"\u0012A\r\u000b\u0005\u0006Q\u0002!\t\u0001I\u0001\u001fi\u0016\u001cH\u000fU1sC2dW\r\\5t[>s7\t[1oO\u0016dwnZ'pI\u0016D#a\u001a\u0015\t\u000b-\u0004A\u0011\u00027\u0002/%tg.\u001a:UKN$8+\u001a;QCJ\fG\u000e\\3mSNlG\u0003B\u0011nmnDQA\u001c6A\u0002=\f\u0001\u0002\u001d:pm&$WM\u001d\t\u0003aNt!AI9\n\u0005I\u001c\u0013A\u0002)sK\u0012,g-\u0003\u0002uk\n11\u000b\u001e:j]\u001eT!A]\u0012\t\u000b]T\u0007\u0019\u0001=\u0002\u0017A\f'/\u00197mK2L7/\u001c\t\u0003EeL!A_\u0012\u0003\u0007%sG\u000fC\u0003}U\u0002\u0007\u00010A\u0003j]\u0012,\u0007\u0010C\u0003\u007f\u0001\u0011%q0A\u0007m_\u000e\fG\u000eR1uKRKW.\u001a\u000b\u0005\u0003\u0003\t\t\u0002\u0005\u0003\u0002\u0004\u00055QBAA\u0003\u0015\u0011\t9!!\u0003\u0002\tQLW.\u001a\u0006\u0003\u0003\u0017\tAA[1wC&!\u0011qBA\u0003\u00055aunY1m\t\u0006$X\rV5nK\"9\u00111C?A\u0002\u0005U\u0011aC3q_\u000eD7+Z2p]\u0012\u00042AIA\f\u0013\r\tIb\t\u0002\u0005\u0019>tw\r")
/* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.class */
public class TableSinkITCase extends StreamingTestBase {
    private static Symbol symbol$1 = Symbol$.MODULE$.apply("id");
    private static Symbol symbol$2 = Symbol$.MODULE$.apply("num");
    private static Symbol symbol$3 = Symbol$.MODULE$.apply("text");
    private static Symbol symbol$4 = Symbol$.MODULE$.apply("rowtime");
    private static Symbol symbol$5 = Symbol$.MODULE$.apply("w");
    private static Symbol symbol$6 = Symbol$.MODULE$.apply("t");
    private static Symbol symbol$7 = Symbol$.MODULE$.apply("icnt");
    private static Symbol symbol$8 = Symbol$.MODULE$.apply("nsum");
    private static Symbol symbol$9 = Symbol$.MODULE$.apply("a");
    private static Symbol symbol$10 = Symbol$.MODULE$.apply("b");
    private static Symbol symbol$11 = Symbol$.MODULE$.apply("c");
    private static Symbol symbol$12 = Symbol$.MODULE$.apply("d");
    private static Symbol symbol$13 = Symbol$.MODULE$.apply("e");
    private static Symbol symbol$14 = Symbol$.MODULE$.apply("f");
    private static Symbol symbol$15 = Symbol$.MODULE$.apply("g");
    private static Symbol symbol$16 = Symbol$.MODULE$.apply("h");
    private static Symbol symbol$17 = Symbol$.MODULE$.apply("len");
    private static Symbol symbol$18 = Symbol$.MODULE$.apply("cTrue");
    private static Symbol symbol$19 = Symbol$.MODULE$.apply("count");
    private static Symbol symbol$20 = Symbol$.MODULE$.apply("lencnt");
    private static Symbol symbol$21 = Symbol$.MODULE$.apply("window_end");
    private static Symbol symbol$22 = Symbol$.MODULE$.apply("wend");
    private static Symbol symbol$23 = Symbol$.MODULE$.apply("cnt");
    private static Symbol symbol$24 = Symbol$.MODULE$.apply("rowtime1");
    private static Symbol symbol$25 = Symbol$.MODULE$.apply("rowtime2");

    @Test
    public void testAppendSinkOnAppendTable() {
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(TestData$.MODULE$.tupleData3(), new TableSinkITCase$$anon$15(this)).assignAscendingTimestamps(new TableSinkITCase$$anonfun$7(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).rowtime()}));
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE appendSink (\n         |  `t` TIMESTAMP(3),\n         |  `icnt` BIGINT,\n         |  `nsum` BIGINT\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'true'\n         |)\n         |"})).s(Nil$.MODULE$))).stripMargin());
        table.window(Tumble.over((Expression) org.apache.flink.table.api.package$.MODULE$.LiteralIntExpression(5).millis()).on(org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$4)).as(org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5))).groupBy(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5)}).select(new Expression[]{org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$5).end()).as(symbol$6, Predef$.MODULE$.wrapRefArray(new Symbol[0])), org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$1).count()).as(symbol$7, Predef$.MODULE$.wrapRefArray(new Symbol[0])), org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$2).sum()).as(symbol$8, Predef$.MODULE$.wrapRefArray(new Symbol[0]))}).executeInsert("appendSink").await();
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"1970-01-01T00:00:00.005,4,8", "1970-01-01T00:00:00.010,5,18", "1970-01-01T00:00:00.015,5,24", "1970-01-01T00:00:00.020,5,29", "1970-01-01T00:00:00.025,2,12"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.getResults("appendSink")).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testAppendSinkWithNestedRow() {
        tEnv().createTemporaryView("src", package$.MODULE$.dataStreamConversions(env().fromCollection(TestData$.MODULE$.smallTupleData3(), new TableSinkITCase$$anon$16(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3)})));
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE appendSink (\n         |  `t` INT,\n         |  `item` ROW<`num` BIGINT, `text` STRING>\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'true'\n         |)\n         |"})).s(Nil$.MODULE$))).stripMargin());
        tEnv().executeSql("INSERT INTO appendSink SELECT id, ROW(num, text) FROM src").await();
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"1,1,Hi", "2,2,Hello", "3,2,Hello world"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.getResults("appendSink")).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testAppendSinkOnAppendTableForInnerJoin() {
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(TestData$.MODULE$.smallTupleData3(), new TableSinkITCase$$anon$17(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$9), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$10), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$11)}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(TestData$.MODULE$.tupleData5(), new TableSinkITCase$$anon$18(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$12), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$13), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$14), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$15), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$16)}));
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE appendSink (\n         |  `c` STRING,\n         |  `g` STRING\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'true'\n         |)\n         |"})).s(Nil$.MODULE$))).stripMargin());
        table.join(table2).where(org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$10).$eq$eq$eq(org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$13))).select(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$11), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$15)}).executeInsert("appendSink").await();
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.getResults("appendSink")).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testRetractSinkOnUpdatingTable() {
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(TestData$.MODULE$.tupleData3(), new TableSinkITCase$$anon$19(this)).assignAscendingTimestamps(new TableSinkITCase$$anonfun$8(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3)}));
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE retractSink (\n         |  `len` INT,\n         |  `icnt` BIGINT,\n         |  `nsum` BIGINT\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'false'\n         |)\n         |"})).s(Nil$.MODULE$))).stripMargin());
        table.select(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$3).charLength()).as(symbol$17, Predef$.MODULE$.wrapRefArray(new Symbol[0]))}).groupBy(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$17)}).select(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$17), org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$1).count()).as(symbol$7, Predef$.MODULE$.wrapRefArray(new Symbol[0])), org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$2).sum()).as(symbol$8, Predef$.MODULE$.wrapRefArray(new Symbol[0]))}).executeInsert("retractSink").await();
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"2,1,1", "5,1,2", "11,1,2", "25,1,3", "10,7,39", "14,1,3", "9,9,41"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.getResults("retractSink")).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testRetractSinkOnAppendTable() {
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(TestData$.MODULE$.tupleData3(), new TableSinkITCase$$anon$20(this)).assignAscendingTimestamps(new TableSinkITCase$$anonfun$9(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).rowtime()}));
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE retractSink (\n         |  `t` TIMESTAMP(3),\n         |  `icnt` BIGINT,\n         |  `nsum` BIGINT\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'false'\n         |)\n         |"})).s(Nil$.MODULE$))).stripMargin());
        table.window(Tumble.over((Expression) org.apache.flink.table.api.package$.MODULE$.LiteralIntExpression(5).millis()).on(org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$4)).as(org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5))).groupBy(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5)}).select(new Expression[]{org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$5).end()).as(symbol$6, Predef$.MODULE$.wrapRefArray(new Symbol[0])), org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$1).count()).as(symbol$7, Predef$.MODULE$.wrapRefArray(new Symbol[0])), org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$2).sum()).as(symbol$8, Predef$.MODULE$.wrapRefArray(new Symbol[0]))}).executeInsert("retractSink").await();
        Assert.assertFalse("Received retraction messages for append only table", JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.getRawResults("retractSink")).exists(new TableSinkITCase$$anonfun$testRetractSinkOnAppendTable$1(this)));
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"1970-01-01T00:00:00.005,4,8", "1970-01-01T00:00:00.010,5,18", "1970-01-01T00:00:00.015,5,24", "1970-01-01T00:00:00.020,5,29", "1970-01-01T00:00:00.025,2,12"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.getResults("retractSink")).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testUpsertSinkOnNestedAggregation() {
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(TestData$.MODULE$.tupleData3(), new TableSinkITCase$$anon$21(this)).assignAscendingTimestamps(new TableSinkITCase$$anonfun$10(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3)}));
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE upsertSink (\n         |  `cnt` BIGINT,\n         |  `lencnt` BIGINT,\n         |  `cTrue` BOOLEAN,\n         |  PRIMARY KEY (cnt, cTrue) NOT ENFORCED\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'false'\n         |)\n         |"})).s(Nil$.MODULE$))).stripMargin());
        table.select(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$3).charLength()).as(symbol$17, Predef$.MODULE$.wrapRefArray(new Symbol[0])), org.apache.flink.table.api.package$.MODULE$.WithOperations(org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$1).$greater(org.apache.flink.table.api.package$.MODULE$.int2Literal(0))).as(symbol$18, Predef$.MODULE$.wrapRefArray(new Symbol[0]))}).groupBy(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$17), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$18)}).select(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$17), org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$1).count()).as(symbol$19, Predef$.MODULE$.wrapRefArray(new Symbol[0])), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$18)}).groupBy(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$19), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$18)}).select(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$19), org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$17).count()).as(symbol$20, Predef$.MODULE$.wrapRefArray(new Symbol[0])), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$18)}).executeInsert("upsertSink").await();
        Assert.assertTrue("Results must include delete messages", JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.getRawResults("upsertSink")).exists(new TableSinkITCase$$anonfun$testUpsertSinkOnNestedAggregation$1(this)));
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"1,5,true", "7,1,true", "9,1,true"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.getResults("upsertSink")).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testUpsertSinkOnAppendingTable() {
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(TestData$.MODULE$.tupleData3(), new TableSinkITCase$$anon$22(this)).assignAscendingTimestamps(new TableSinkITCase$$anonfun$11(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).rowtime()}));
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE upsertSink (\n         |  `num` BIGINT,\n         |  `wend` TIMESTAMP(3),\n         |  `icnt` BIGINT,\n         |  PRIMARY KEY (num, wend, icnt) NOT ENFORCED\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'false'\n         |)\n         |"})).s(Nil$.MODULE$))).stripMargin());
        table.window(Tumble.over((Expression) org.apache.flink.table.api.package$.MODULE$.LiteralIntExpression(5).millis()).on(org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$4)).as(org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5))).groupBy(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2)}).select(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$5).end()).as(symbol$21, Predef$.MODULE$.wrapRefArray(new Symbol[0])), org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$1).count()).as(symbol$7, Predef$.MODULE$.wrapRefArray(new Symbol[0]))}).executeInsert("upsertSink").await();
        Assert.assertFalse("Received retraction messages for append only table", JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.getRawResults("upsertSink")).exists(new TableSinkITCase$$anonfun$testUpsertSinkOnAppendingTable$1(this)));
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"1,1970-01-01T00:00:00.005,1", "2,1970-01-01T00:00:00.005,2", "3,1970-01-01T00:00:00.005,1", "3,1970-01-01T00:00:00.010,2", "4,1970-01-01T00:00:00.010,3", "4,1970-01-01T00:00:00.015,1", "5,1970-01-01T00:00:00.015,4", "5,1970-01-01T00:00:00.020,1", "6,1970-01-01T00:00:00.020,4", "6,1970-01-01T00:00:00.025,2"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.getResults("upsertSink")).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testUpsertSinkOnAppendingTableWithoutFullKey1() {
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(TestData$.MODULE$.tupleData3(), new TableSinkITCase$$anon$23(this)).assignAscendingTimestamps(new TableSinkITCase$$anonfun$12(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).rowtime()}));
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE upsertSink (\n         |  `wend` TIMESTAMP(3),\n         |  `icnt` BIGINT,\n         |  PRIMARY KEY (wend, icnt) NOT ENFORCED\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'false'\n         |)\n         |"})).s(Nil$.MODULE$))).stripMargin());
        table.window(Tumble.over((Expression) org.apache.flink.table.api.package$.MODULE$.LiteralIntExpression(5).millis()).on(org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$4)).as(org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5))).groupBy(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2)}).select(new Expression[]{org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$5).end()).as(symbol$22, Predef$.MODULE$.wrapRefArray(new Symbol[0])), org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$1).count()).as(symbol$23, Predef$.MODULE$.wrapRefArray(new Symbol[0]))}).executeInsert("upsertSink").await();
        List<String> rawResults = TestValuesTableFactory.getRawResults("upsertSink");
        Assert.assertFalse("Received retraction messages for append only table", JavaConversions$.MODULE$.asScalaBuffer(rawResults).exists(new TableSinkITCase$$anonfun$testUpsertSinkOnAppendingTableWithoutFullKey1$1(this)));
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"+I(1970-01-01T00:00:00.005,1)", "+I(1970-01-01T00:00:00.005,2)", "+I(1970-01-01T00:00:00.005,1)", "+I(1970-01-01T00:00:00.010,2)", "+I(1970-01-01T00:00:00.010,3)", "+I(1970-01-01T00:00:00.015,1)", "+I(1970-01-01T00:00:00.015,4)", "+I(1970-01-01T00:00:00.020,1)", "+I(1970-01-01T00:00:00.020,4)", "+I(1970-01-01T00:00:00.025,2)"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(rawResults).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testUpsertSinkOnAppendingTableWithoutFullKey2() {
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(TestData$.MODULE$.tupleData3(), new TableSinkITCase$$anon$24(this)).assignAscendingTimestamps(new TableSinkITCase$$anonfun$13(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).rowtime()}));
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE upsertSink (\n         |  `num` BIGINT,\n         |  `cnt` BIGINT,\n         |  PRIMARY KEY (num) NOT ENFORCED\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'false'\n         |)\n         |"})).s(Nil$.MODULE$))).stripMargin());
        table.window(Tumble.over((Expression) org.apache.flink.table.api.package$.MODULE$.LiteralIntExpression(5).millis()).on(org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$4)).as(org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5))).groupBy(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2)}).select(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$1).count()).as(symbol$23, Predef$.MODULE$.wrapRefArray(new Symbol[0]))}).executeInsert("upsertSink").await();
        List<String> rawResults = TestValuesTableFactory.getRawResults("upsertSink");
        Assert.assertFalse("Received retraction messages for append only table", JavaConversions$.MODULE$.asScalaBuffer(rawResults).exists(new TableSinkITCase$$anonfun$testUpsertSinkOnAppendingTableWithoutFullKey2$1(this)));
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"+I(1,1)", "+I(2,2)", "+I(3,1)", "+I(3,2)", "+I(4,3)", "+I(4,1)", "+I(5,4)", "+I(5,1)", "+I(6,4)", "+I(6,2)"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(rawResults).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testUpsertSinkWithFilter() {
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(TestData$.MODULE$.tupleData3(), new TableSinkITCase$$anon$25(this)).assignAscendingTimestamps(new TableSinkITCase$$anonfun$14(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3)}));
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE upsertSink (\n         |  `num` BIGINT,\n         |  `cnt` BIGINT,\n         |  PRIMARY KEY (num) NOT ENFORCED\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'false'\n         |)\n         |"})).s(Nil$.MODULE$))).stripMargin());
        table.groupBy(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2)}).select(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$1).count()).as(symbol$23, Predef$.MODULE$.wrapRefArray(new Symbol[0]))}).where(org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$23).$less$eq(org.apache.flink.table.api.package$.MODULE$.int2Literal(3))).executeInsert("upsertSink").await();
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"1,1", "2,2", "3,3"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.getResults("upsertSink")).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testMultiRowtime() {
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(TestData$.MODULE$.tupleData3(), new TableSinkITCase$$anon$26(this)).assignAscendingTimestamps(new TableSinkITCase$$anonfun$15(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).rowtime()}));
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE sink (\n         |  `num` BIGINT,\n         |  `ts1` TIMESTAMP(3),\n         |  `ts2` TIMESTAMP(3)\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'true'\n         |)\n         |"})).s(Nil$.MODULE$))).stripMargin());
        Table select = table.window(Tumble.over((Expression) org.apache.flink.table.api.package$.MODULE$.LiteralIntExpression(5).milli()).on(org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$4)).as(org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5))).groupBy(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5)}).select(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$5).rowtime()).as(symbol$24, Predef$.MODULE$.wrapRefArray(new Symbol[0])), org.apache.flink.table.api.package$.MODULE$.WithOperations((Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$5).rowtime()).as(symbol$25, Predef$.MODULE$.wrapRefArray(new Symbol[0]))});
        thrown().expect(TableException.class);
        thrown().expectMessage("Found more than one rowtime field: [rowtime1, rowtime2] in the query when insert into 'default_catalog.default_database.sink'");
        select.executeInsert("sink");
    }

    @Test
    public void testDecimalOnSinkFunctionTableSink() {
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE sink (\n         |  `c` VARCHAR(5),\n         |  `b` DECIMAL(10, 0),\n         |  `d` CHAR(5)\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'true'\n         |)\n         |"})).s(Nil$.MODULE$))).stripMargin());
        package$.MODULE$.dataStreamConversions(env().fromCollection(TestData$.MODULE$.tupleData3(), new TableSinkITCase$$anon$27(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$9), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$10), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$11)})).where(org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$9).$greater(org.apache.flink.table.api.package$.MODULE$.int2Literal(20))).select(new Expression[]{org.apache.flink.table.api.package$.MODULE$.string2Literal("12345"), (Expression) org.apache.flink.table.api.package$.MODULE$.LiteralIntExpression(55).cast(DataTypes.DECIMAL(10, 0)), (Expression) org.apache.flink.table.api.package$.MODULE$.LiteralStringExpression("12345").cast(DataTypes.CHAR(5))}).executeInsert("sink").await();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"12345,55,12345"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.getResults("sink")).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testDecimalOnOutputFormatTableSink() {
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE sink (\n         |  `c` VARCHAR(5),\n         |  `b` DECIMAL(10, 0),\n         |  `d` CHAR(5)\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'true',\n         |  'runtime-sink' = 'OutputFormat'\n         |)\n         |"})).s(Nil$.MODULE$))).stripMargin());
        package$.MODULE$.dataStreamConversions(env().fromCollection(TestData$.MODULE$.tupleData3(), new TableSinkITCase$$anon$28(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$9), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$10), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$11)})).where(org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$9).$greater(org.apache.flink.table.api.package$.MODULE$.int2Literal(20))).select(new Expression[]{org.apache.flink.table.api.package$.MODULE$.string2Literal("12345"), (Expression) org.apache.flink.table.api.package$.MODULE$.LiteralIntExpression(55).cast(DataTypes.DECIMAL(10, 0)), (Expression) org.apache.flink.table.api.package$.MODULE$.LiteralStringExpression("12345").cast(DataTypes.CHAR(5))}).executeInsert("sink").await();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"12345,55,12345"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.getResults("sink")).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testChangelogSourceAndChangelogSink() {
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE orders (\n         |  product_id BIGINT,\n         |  user_name STRING,\n         |  order_price DECIMAL(18, 2)\n         |) WITH (\n         |  'connector' = 'values',\n         |  'data-id' = '", "'\n         |)\n         |"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{TestValuesTableFactory.registerData((Seq<Row>) List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Row[]{rowOf(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(1L), "user1", new BigDecimal("10.02")})), rowOf(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(1L), "user2", new BigDecimal("71.2")})), rowOf(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(1L), "user1", new BigDecimal("8.1")})), rowOf(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(2L), "user3", new BigDecimal("11.3")})), rowOf(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(2L), "user4", new BigDecimal("9.99")})), rowOf(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(2L), "user1", new BigDecimal("10")})), rowOf(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(2L), "user3", new BigDecimal("21.03")}))})))})))).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE changelog_sink (\n        |  product_id BIGINT,\n        |  user_name STRING,\n        |  order_price DECIMAL(18, 2)\n        |) WITH (\n        |  'connector' = 'values',\n        |  'sink-insert-only' = 'false'\n        |)\n        |")).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO changelog_sink\n        |SELECT product_id, user_name, SUM(order_price)\n        |FROM orders\n        |GROUP BY product_id, user_name\n        |")).stripMargin()).await();
        List<String> rawResults = TestValuesTableFactory.getRawResults("changelog_sink");
        scala.collection.immutable.List apply = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"+I(1,user2,71.20)", "+I(1,user1,10.02)", "-U(1,user1,10.02)", "+U(1,user1,18.12)", "+I(2,user4,9.99)", "+I(2,user1,10.00)", "+I(2,user3,11.30)", "-U(2,user3,11.30)", "+U(2,user3,32.33)"}));
        Assert.assertEquals(apply.sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(rawResults).sorted(Ordering$String$.MODULE$));
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n        |CREATE TABLE changelog_source (\n        |  product_id BIGINT,\n        |  user_name STRING,\n        |  price DECIMAL(18, 2)\n        |) WITH (\n        |  'connector' = 'values',\n        |  'data-id' = '", "',\n        |  'changelog-mode' = 'I,UB,UA,D'\n        |)\n        |"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{TestValuesTableFactory.registerData((Seq<Row>) apply.map(new TableSinkITCase$$anonfun$16(this), List$.MODULE$.canBuildFrom()))})))).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE final_sink (\n        |  user_name STRING,\n        |  total_pay DECIMAL(18, 2),\n        |  PRIMARY KEY (user_name) NOT ENFORCED\n        |) WITH (\n        |  'connector' = 'values',\n        |  'sink-insert-only' = 'false'\n        |)\n        |")).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO final_sink\n        |SELECT user_name, SUM(price) as total_pay\n        |FROM changelog_source\n        |GROUP BY user_name\n        |")).stripMargin()).await();
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"user1,28.12", "user2,71.20", "user3,32.33", "user4,9.99"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.getResults("final_sink")).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testNotNullEnforcer() {
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE nullable_src (\n         |  category STRING,\n         |  shopId INT,\n         |  num INT\n         |) WITH (\n         |  'connector' = 'values',\n         |  'data-id' = '", "'\n         |)\n         |"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{TestValuesTableFactory.registerData(TestData$.MODULE$.nullData4())})))).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE not_null_sink (\n         |  category STRING,\n         |  shopId INT,\n         |  num INT NOT NULL\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'true'\n         |)\n         |"})).s(Nil$.MODULE$))).stripMargin());
        try {
            tEnv().executeSql("INSERT INTO not_null_sink SELECT * FROM nullable_src").await();
            Assert.fail("Execution should fail.");
        } catch (Throwable th) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(th, "Column 'num' is NOT NULL, however, a null value is being written into it. You can set job configuration 'table.exec.sink.not-null-enforcer'='drop' to suppress this exception and drop such records silently.").isPresent());
        }
        tEnv().getConfig().getConfiguration().setString("table.exec.sink.not-null-enforcer", "drop");
        tEnv().executeSql("INSERT INTO not_null_sink SELECT * FROM nullable_src").await();
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"book,1,12", "book,4,11", "fruit,3,44"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.getResults("not_null_sink")).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testSinkContext() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE src (\n         |  log_ts STRING,\n         |  ts TIMESTAMP(3),\n         |  a INT,\n         |  b DOUBLE,\n         |  WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND\n         |) WITH (\n         |  'connector' = 'values',\n         |  'data-id' = '", "'\n         |)\n      "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{TestValuesTableFactory.registerData((Seq<Row>) List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Row[]{rowOf(Predef$.MODULE$.genericWrapArray(new Object[]{"1970-01-01 00:00:00.001", localDateTime(1L), BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToDouble(1.0d)})), rowOf(Predef$.MODULE$.genericWrapArray(new Object[]{"1970-01-01 00:00:00.002", localDateTime(2L), BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToDouble(2.0d)})), rowOf(Predef$.MODULE$.genericWrapArray(new Object[]{"1970-01-01 00:00:00.003", localDateTime(3L), BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToDouble(2.0d)})), rowOf(Predef$.MODULE$.genericWrapArray(new Object[]{"1970-01-01 00:00:00.004", localDateTime(4L), BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToDouble(5.0d)})), rowOf(Predef$.MODULE$.genericWrapArray(new Object[]{"1970-01-01 00:00:00.007", localDateTime(7L), BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToDouble(3.0d)})), rowOf(Predef$.MODULE$.genericWrapArray(new Object[]{"1970-01-01 00:00:00.008", localDateTime(8L), BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToDouble(3.0d)})), rowOf(Predef$.MODULE$.genericWrapArray(new Object[]{"1970-01-01 00:00:00.016", localDateTime(16L), BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToDouble(4.0d)}))})))})))).stripMargin();
        String stripMargin2 = new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE sink (\n         |  log_ts STRING,\n         |  ts TIMESTAMP(3),\n         |  a INT,\n         |  b DOUBLE\n         |) WITH (\n         |  'connector' = 'values',\n         |  'table-sink-class' = '", "'\n         |)\n      "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{TestValuesTableFactory.TestSinkContextTableSink.class.getName()})))).stripMargin();
        tEnv().executeSql(stripMargin);
        tEnv().executeSql(stripMargin2);
        tEnv().executeSql("INSERT INTO sink SELECT * FROM src").await();
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{1000, 2000, 3000, 4000, 7000, 8000, 16000})).sorted(Ordering$Int$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.TestSinkContextTableSink.ROWTIMES).sorted(Ordering$.MODULE$.ordered(Predef$.MODULE$.$conforms())));
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE sink2 (\n         |  window_rowtime TIMESTAMP(3),\n         |  b DOUBLE\n         |) WITH (\n         |  'connector' = 'values',\n         |  'table-sink-class' = '", "'\n         |)\n      "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{TestValuesTableFactory.TestSinkContextTableSink.class.getName()})))).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO sink2\n        |SELECT\n        |  TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),\n        |  SUM(b)\n        |FROM src\n        |GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)\n        |")).stripMargin()).await();
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{4999, 9999, 19999})).sorted(Ordering$Int$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.TestSinkContextTableSink.ROWTIMES).sorted(Ordering$.MODULE$.ordered(Predef$.MODULE$.$conforms())));
    }

    @Test
    public void testMetadataSourceAndSink() {
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE MetadataSource (\n         |  category STRING,\n         |  shopId INT,\n         |  num BIGINT METADATA FROM 'metadata_1'\n         |) WITH (\n         |  'connector' = 'values',\n         |  'data-id' = '", "',\n         |  'readable-metadata' = 'metadata_1:INT'\n         |)\n         |"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{TestValuesTableFactory.registerData(TestData$.MODULE$.nullData4())})))).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE MetadataSink (\n         |  category STRING METADATA FROM 'metadata_1',\n         |  shopId INT,\n         |  metadata_3 BIGINT METADATA VIRTUAL,\n         |  num STRING METADATA FROM 'metadata_2'\n         |) WITH (\n         |  'connector' = 'values',\n         |  'readable-metadata' = 'metadata_1:STRING, metadata_2:INT, metadata_3:BIGINT',\n         |  'writable-metadata' = 'metadata_1:STRING, metadata_2:INT'\n         |)\n         |"})).s(Nil$.MODULE$))).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n           |INSERT INTO MetadataSink\n           |SELECT category, shopId, CAST(num AS STRING)\n           |FROM MetadataSource\n           |"})).s(Nil$.MODULE$))).stripMargin()).await();
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"1,book,12", "2,book,null", "3,fruit,44", "4,book,11", "4,fruit,null", "5,fruit,null"})).sorted(Ordering$String$.MODULE$), JavaConversions$.MODULE$.asScalaBuffer(TestValuesTableFactory.getResults("MetadataSink")).sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testParallelismWithDataStream() {
        Failure apply = Try$.MODULE$.apply(new TableSinkITCase$$anonfun$1(this));
        if (apply instanceof Success) {
            Assert.fail("this should not happen");
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(apply instanceof Failure)) {
                throw new MatchError(apply);
            }
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(apply.exception(), "`DataStreamSinkProvider` is not allowed to work with `ParallelismProvider`, please see document of `ParallelismProvider`").isPresent());
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    @Test
    public void testParallelismWithSinkFunction() {
        AtomicInteger atomicInteger = new AtomicInteger(1);
        Failure apply = Try$.MODULE$.apply(new TableSinkITCase$$anonfun$2(this, Integer.MAX_VALUE, atomicInteger));
        if (apply instanceof Success) {
            Assert.fail("this should not happen");
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(apply instanceof Failure)) {
                throw new MatchError(apply);
            }
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(apply.exception(), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Failed to wait job finish"})).s(Nil$.MODULE$)).isPresent());
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        Failure apply2 = Try$.MODULE$.apply(new TableSinkITCase$$anonfun$3(this, -1, atomicInteger));
        if (apply2 instanceof Success) {
            Assert.fail("this should not happen");
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            if (!(apply2 instanceof Failure)) {
                throw new MatchError(apply2);
            }
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(apply2.exception(), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"should not be less than zero or equal to zero"})).s(Nil$.MODULE$)).isPresent());
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
        Assert.assertTrue(Try$.MODULE$.apply(new TableSinkITCase$$anonfun$testParallelismWithSinkFunction$1(this, 1, atomicInteger)).isSuccess());
    }

    @Test
    public void testParallelismWithOutputFormat() {
        AtomicInteger atomicInteger = new AtomicInteger(1);
        Failure apply = Try$.MODULE$.apply(new TableSinkITCase$$anonfun$4(this, -1, atomicInteger));
        if (apply instanceof Success) {
            Assert.fail("this should not happen");
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(apply instanceof Failure)) {
                throw new MatchError(apply);
            }
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(apply.exception(), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"should not be less than zero or equal to zero"})).s(Nil$.MODULE$)).isPresent());
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        Failure apply2 = Try$.MODULE$.apply(new TableSinkITCase$$anonfun$5(this, Integer.MAX_VALUE, atomicInteger));
        if (apply2 instanceof Success) {
            Assert.fail("this should not happen");
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            if (!(apply2 instanceof Failure)) {
                throw new MatchError(apply2);
            }
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(apply2.exception(), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Failed to wait job finish"})).s(Nil$.MODULE$)).isPresent());
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
        Assert.assertTrue(Try$.MODULE$.apply(new TableSinkITCase$$anonfun$testParallelismWithOutputFormat$1(this, 1, atomicInteger)).isSuccess());
    }

    @Test
    public void testParallelismOnChangelogMode() {
        String registerData = TestValuesTableFactory.registerData(TestData$.MODULE$.data1());
        String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"test_para_source"})).s(Nil$.MODULE$);
        String s2 = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"test_para_sink_without_pk"})).s(Nil$.MODULE$);
        String s3 = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"test_para_sink_with_pk"})).s(Nil$.MODULE$);
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE ", " (\n         |  the_month INT,\n         |  area STRING,\n         |  product INT\n         |) WITH (\n         |  'connector' = 'values',\n         |  'data-id' = '", "',\n         |  'bounded' = 'true'\n         |)\n         |"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{s, registerData})))).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE ", " (\n         |  the_month INT,\n         |  area STRING,\n         |  product INT\n         |) WITH (\n         |  'connector' = 'values',\n         |  'runtime-sink' = 'SinkFunction',\n         |  'sink.parallelism' = '", "',\n         |  'sink-changelog-mode-enforced' = 'I,D'\n         |)\n         |"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{s2, BoxesRunTime.boxToInteger(2)})))).stripMargin());
        Failure apply = Try$.MODULE$.apply(new TableSinkITCase$$anonfun$6(this, s, s2));
        if (!(apply instanceof Failure)) {
            throw new MatchError(apply);
        }
        Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(apply.exception(), "primary key is required but no primary key is found").isPresent());
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE ", " (\n         |  the_month INT,\n         |  area STRING,\n         |  product INT,\n         |  PRIMARY KEY (area) NOT ENFORCED\n         |) WITH (\n         |  'connector' = 'values',\n         |  'runtime-sink' = 'SinkFunction',\n         |  'sink.parallelism' = '", "',\n         |  'sink-changelog-mode-enforced' = 'I,D'\n         |)\n         |"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{s3, BoxesRunTime.boxToInteger(2)})))).stripMargin());
        Assert.assertTrue(Try$.MODULE$.apply(new TableSinkITCase$$anonfun$testParallelismOnChangelogMode$1(this, s, s3)).isSuccess());
    }

    public void org$apache$flink$table$planner$runtime$stream$table$TableSinkITCase$$innerTestSetParallelism(String str, int i, int i2) {
        String registerData = TestValuesTableFactory.registerData(TestData$.MODULE$.data1());
        String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"test_para_source_", "_", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str.toLowerCase().trim(), BoxesRunTime.boxToInteger(i2)}));
        String s2 = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"test_para_sink_", "_", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str.toLowerCase().trim(), BoxesRunTime.boxToInteger(i2)}));
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE ", " (\n         |  the_month INT,\n         |  area STRING,\n         |  product INT\n         |) WITH (\n         |  'connector' = 'values',\n         |  'data-id' = '", "',\n         |  'bounded' = 'true'\n         |)\n         |"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{s, registerData})))).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n         |CREATE TABLE ", " (\n         |  the_month INT,\n         |  area STRING,\n         |  product INT\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'true',\n         |  'runtime-sink' = '", "',\n         |  'sink.parallelism' = '", "'\n         |)\n         |"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{s2, str, BoxesRunTime.boxToInteger(i)})))).stripMargin());
        tEnv().executeSql(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"INSERT INTO ", " SELECT * FROM ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{s2, s}))).await();
    }

    private LocalDateTime localDateTime(long j) {
        return LocalDateTime.ofEpochSecond(j, 0, ZoneOffset.UTC);
    }
}
