package org.apache.flink.table.planner.runtime.stream.sql;

import java.util.List;
import org.apache.flink.table.api.bridge.scala.StreamStatementSet;
import org.apache.flink.table.planner.expressions.utils.TestNonDeterministicUdf;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.BatchTestBase$;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.Predef$;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.math.Ordering$String$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: TableSinkITCase.scala */
@ExtendWith({ParameterizedTestExtension.class})
@ScalaSignature(bytes = "\u0006\u0001\u0005\ra\u0001B\u0001\u0003\u0001M\u0011q\u0002V1cY\u0016\u001c\u0016N\\6J)\u000e\u000b7/\u001a\u0006\u0003\u0007\u0011\t1a]9m\u0015\t)a!\u0001\u0004tiJ,\u0017-\u001c\u0006\u0003\u000f!\tqA];oi&lWM\u0003\u0002\n\u0015\u00059\u0001\u000f\\1o]\u0016\u0014(BA\u0006\r\u0003\u0015!\u0018M\u00197f\u0015\tia\"A\u0003gY&t7N\u0003\u0002\u0010!\u00051\u0011\r]1dQ\u0016T\u0011!E\u0001\u0004_J<7\u0001A\n\u0003\u0001Q\u0001\"!\u0006\r\u000e\u0003YQ!a\u0006\u0004\u0002\u000bU$\u0018\u000e\\:\n\u0005e1\"AG*ue\u0016\fW.\u001b8h/&$\bn\u0015;bi\u0016$Vm\u001d;CCN,\u0007\u0002C\u000e\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u000f\u0002\t5|G-\u001a\t\u0003;Er!AH\u0018\u000f\u0005}qcB\u0001\u0011.\u001d\t\tCF\u0004\u0002#W9\u00111E\u000b\b\u0003I%r!!\n\u0015\u000e\u0003\u0019R!a\n\n\u0002\rq\u0012xn\u001c;?\u0013\u0005\t\u0012BA\b\u0011\u0013\tia\"\u0003\u0002\f\u0019%\u0011\u0011BC\u0005\u0003\u000f!I!a\u0006\u0004\n\u0005A2\u0012AG*ue\u0016\fW.\u001b8h/&$\bn\u0015;bi\u0016$Vm\u001d;CCN,\u0017B\u0001\u001a4\u0005A\u0019F/\u0019;f\u0005\u0006\u001c7.\u001a8e\u001b>$WM\u0003\u00021-!)Q\u0007\u0001C\u0001m\u00051A(\u001b8jiz\"\"aN\u001d\u0011\u0005a\u0002Q\"\u0001\u0002\t\u000bm!\u0004\u0019\u0001\u000f\t\u000bm\u0002A\u0011\t\u001f\u0002\r\t,gm\u001c:f)\u0005i\u0004C\u0001 B\u001b\u0005y$\"\u0001!\u0002\u000bM\u001c\u0017\r\\1\n\u0005\t{$\u0001B+oSRD#A\u000f#\u0011\u0005\u0015cU\"\u0001$\u000b\u0005\u001dC\u0015aA1qS*\u0011\u0011JS\u0001\bUV\u0004\u0018\u000e^3s\u0015\tY\u0005#A\u0003kk:LG/\u0003\u0002N\r\nQ!)\u001a4pe\u0016,\u0015m\u00195\t\u000b=\u0003A\u0011\u0001\u001f\u00023Q,7\u000f\u001e&pS:$\u0015n]8sI\u0016\u00148\t[1oO\u0016dun\u001a\u0015\u0003\u001dF\u0003\"!\u0012*\n\u0005M3%\u0001\u0004+fgR$V-\u001c9mCR,\u0007\"B+\u0001\t\u0003a\u0014!\u0007;fgR\u001c\u0016N\\6ESN|'\u000fZ3s\u0007\"\fgnZ3M_\u001eD#\u0001V)\t\u000ba\u0003A\u0011\u0001\u001f\u0002CQ,7\u000f^*j].$\u0015n]8sI\u0016\u00148\t[1oO\u0016dunZ,ji\"\u0014\u0016M\\6)\u0005]\u000b\u0006\"B.\u0001\t\u0003a\u0014A\u0010;fgR\u001c\u0005.\u00198hK2|wmU8ve\u000e,w+\u001b;i\u001d>tG)\u001a;fe6Lg.[:uS\u000e4UO\\2TS:\\w+\u001b;i\t&4g-\u001a:f]R\u00046\u000e\u000b\u0002[#\")a\f\u0001C\u0001y\u0005!B/Z:u\u0013:\u001cXM\u001d;QCJ$8i\u001c7v[:D#!X)\t\u000b\u0005\u0004A\u0011\u0001\u001f\u0002/Q,7\u000f^\"sK\u0006$X\rV1cY\u0016\f5oU3mK\u000e$\bF\u00011R\u0011\u0015!\u0007\u0001\"\u0001=\u0003\u0011\"Xm\u001d;De\u0016\fG/\u001a+bE2,\u0017i]*fY\u0016\u001cGoV5uQN{'\u000f\u001e'j[&$\bFA2R\u0011\u00159\u0007\u0001\"\u0001=\u0003\u0015\"Xm\u001d;De\u0016\fG/\u001a+bE2,\u0017i]*fY\u0016\u001cGoV5uQ>,Ho\u00149uS>t7\u000f\u000b\u0002g#\")!\u000e\u0001C\u0001y\u0005\tB/Z:u!\u0006\u0014H/[1m\u0013:\u001cXM\u001d;)\u0005%\f\u0006\u0006\u0002\u0001ngR\u0004\"A\\9\u000e\u0003=T!\u0001\u001d$\u0002\u0013\u0015DH/\u001a8tS>t\u0017B\u0001:p\u0005))\u0005\u0010^3oI^KG\u000f[\u0001\u0006m\u0006dW/\u001a\u0017\u0002k\u000e\na\u000f\u0005\u0002x\u007f6\t\u0001P\u0003\u0002zu\u0006i\u0001/\u0019:b[\u0016$XM]5{K\u0012T!a\u001f?\u0002\u0015\u0015DH/\u001a8tS>t7O\u0003\u0002L{*\u0011a\u0010D\u0001\ni\u0016\u001cH/\u001e;jYNL1!!\u0001y\u0005i\u0001\u0016M]1nKR,'/\u001b>fIR+7\u000f^#yi\u0016t7/[8o\u0001")
/* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.class */
public class TableSinkITCase extends StreamingWithStateTestBase {
    @Override // org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase, org.apache.flink.table.planner.runtime.utils.StreamingTestBase
    @BeforeEach
    public void before() {
        super.before();
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(219).append("\n                       |CREATE TABLE src (person String, votes BIGINT) WITH(\n                       |  'connector' = 'values',\n                       |  'data-id' = '").append(TestValuesTableFactory.registerData((Seq<Row>) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Row[]{BatchTestBase$.MODULE$.row(Predef$.MODULE$.genericWrapArray(new Object[]{"jason", BoxesRunTime.boxToLong(1L)})), BatchTestBase$.MODULE$.row(Predef$.MODULE$.genericWrapArray(new Object[]{"jason", BoxesRunTime.boxToLong(1L)})), BatchTestBase$.MODULE$.row(Predef$.MODULE$.genericWrapArray(new Object[]{"jason", BoxesRunTime.boxToLong(1L)})), BatchTestBase$.MODULE$.row(Predef$.MODULE$.genericWrapArray(new Object[]{"jason", BoxesRunTime.boxToLong(1L)}))})))).append("'\n                       |)\n                       |").toString())).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(183).append("\n         |CREATE TABLE award (votes BIGINT, prize DOUBLE, PRIMARY KEY(votes) NOT ENFORCED) WITH(\n         |  'connector' = 'values',\n         |  'data-id' = '").append(TestValuesTableFactory.registerData((Seq<Row>) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Row[]{BatchTestBase$.MODULE$.row(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToDouble(5.2d)})), BatchTestBase$.MODULE$.row(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToDouble(12.1d)})), BatchTestBase$.MODULE$.row(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(3L), BoxesRunTime.boxToDouble(18.3d)})), BatchTestBase$.MODULE$.row(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToDouble(22.5d)}))})))).append("'\n         |)\n         |").toString())).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(181).append("\n         |CREATE TABLE people (person STRING, age INT, PRIMARY KEY(person) NOT ENFORCED) WITH(\n         |  'connector' = 'values',\n         |  'data-id' = '").append(TestValuesTableFactory.registerData((Seq<Row>) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Row[]{BatchTestBase$.MODULE$.row(Predef$.MODULE$.genericWrapArray(new Object[]{"jason", BoxesRunTime.boxToInteger(22)}))})))).append("'\n         |)\n         |").toString())).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(511).append("\n                       |CREATE TABLE users (\n                       |  user_id STRING,\n                       |  user_name STRING,\n                       |  email STRING,\n                       |  balance DECIMAL(18,2),\n                       |  primary key (user_id) not enforced\n                       |) WITH (\n                       | 'connector' = 'values',\n                       | 'data-id' = '").append(TestValuesTableFactory.registerData(TestData$.MODULE$.userChangelog())).append("',\n                       | 'changelog-mode' = 'I,UA,UB,D'\n                       |)\n                       |").toString())).stripMargin());
    }

    @TestTemplate
    public void testJoinDisorderChangeLog() {
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                      |CREATE TABLE JoinDisorderChangeLog (\n                      |  person STRING, votes BIGINT, prize DOUBLE, age INT,\n                      |  PRIMARY KEY(person) NOT ENFORCED) WITH(\n                      |  'connector' = 'values',\n                      |  'sink-insert-only' = 'false'\n                      |)\n                      |")).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |INSERT INTO JoinDisorderChangeLog\n                    |SELECT T1.person, T1.sum_votes, T1.prize, T2.age FROM\n                    | (SELECT T.person, T.sum_votes, award.prize FROM\n                    |   (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) T,\n                    |   award\n                    |   WHERE T.sum_votes = award.votes) T1, people T2\n                    | WHERE T1.person = T2.person\n                    |")).stripMargin()).await();
        Assertions.assertThat(JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("JoinDisorderChangeLog")).sorted(Ordering$String$.MODULE$)).isEqualTo(new $colon.colon("+I[jason, 4, 22.5, 22]", Nil$.MODULE$).sorted(Ordering$String$.MODULE$));
    }

    @TestTemplate
    public void testSinkDisorderChangeLog() {
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                      |CREATE TABLE SinkDisorderChangeLog (\n                      |  person STRING, votes BIGINT, prize DOUBLE,\n                      |  PRIMARY KEY(person) NOT ENFORCED) WITH(\n                      |  'connector' = 'values',\n                      |  'sink-insert-only' = 'false'\n                      |)\n                      |")).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |INSERT INTO SinkDisorderChangeLog\n                    |SELECT T.person, T.sum_votes, award.prize FROM\n                    |   (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) T, award\n                    |   WHERE T.sum_votes = award.votes\n                    |")).stripMargin()).await();
        Assertions.assertThat(JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("SinkDisorderChangeLog")).sorted(Ordering$String$.MODULE$)).isEqualTo(new $colon.colon("+I[jason, 4, 22.5]", Nil$.MODULE$).sorted(Ordering$String$.MODULE$));
    }

    @TestTemplate
    public void testSinkDisorderChangeLogWithRank() {
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                      |CREATE TABLE SinkRankChangeLog (\n                      |  person STRING, votes BIGINT,\n                      |  PRIMARY KEY(person) NOT ENFORCED) WITH(\n                      |  'connector' = 'values',\n                      |  'sink-insert-only' = 'false'\n                      |)\n                      |")).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n          |INSERT INTO SinkRankChangeLog\n          |SELECT person, sum_votes FROM\n          | (SELECT person, sum_votes,\n          |   ROW_NUMBER() OVER (PARTITION BY vote_section ORDER BY sum_votes DESC) AS rank_number\n          |   FROM (SELECT person, SUM(votes) AS sum_votes, SUM(votes) / 2 AS vote_section FROM src\n          |      GROUP BY person))\n          |   WHERE rank_number < 10\n          |")).stripMargin()).await();
        Assertions.assertThat(JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("SinkRankChangeLog")).sorted(Ordering$String$.MODULE$)).isEqualTo(new $colon.colon("+I[jason, 4]", Nil$.MODULE$).sorted(Ordering$String$.MODULE$));
    }

    @TestTemplate
    public void testChangelogSourceWithNonDeterministicFuncSinkWithDifferentPk() {
        tEnv().createTemporaryFunction("ndFunc", new TestNonDeterministicUdf());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                      |CREATE TABLE sink_with_pk (\n                      |  user_id STRING,\n                      |  user_name STRING,\n                      |  email STRING,\n                      |  balance DECIMAL(18,2),\n                      |  PRIMARY KEY(email) NOT ENFORCED\n                      |) WITH(\n                      |  'connector' = 'values',\n                      |  'sink-insert-only' = 'false'\n                      |)\n                      |")).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                     |insert into sink_with_pk\n                     |select user_id, SPLIT_INDEX(ndFunc(user_name), '-', 0), email, balance\n                     |from users\n                     |")).stripMargin()).await();
        Assertions.assertThat(JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("sink_with_pk")).sorted(Ordering$String$.MODULE$)).isEqualTo(new $colon.colon("+I[user1, Tom, tom123@gmail.com, 8.10]", new $colon.colon("+I[user3, Bailey, bailey@qq.com, 9.99]", new $colon.colon("+I[user4, Tina, tina@gmail.com, 11.30]", Nil$.MODULE$))).sorted(Ordering$String$.MODULE$));
        Assertions.assertThat(JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getRawResultsAsStrings("sink_with_pk")).toList()).isEqualTo(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"+I[user1, Tom, tom@gmail.com, 10.02]", "+I[user2, Jack, jack@hotmail.com, 71.20]", "-D[user1, Tom, tom@gmail.com, 10.02]", "+I[user1, Tom, tom123@gmail.com, 8.10]", "+I[user3, Bailey, bailey@gmail.com, 9.99]", "-D[user2, Jack, jack@hotmail.com, 71.20]", "+I[user4, Tina, tina@gmail.com, 11.30]", "-D[user3, Bailey, bailey@gmail.com, 9.99]", "+I[user3, Bailey, bailey@qq.com, 9.99]"})));
    }

    @TestTemplate
    public void testInsertPartColumn() {
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                      |CREATE TABLE zm_test (\n                      |  `person` String,\n                      |  `votes` BIGINT,\n                      |  `m1` MAP<STRING, BIGINT>,\n                      |  `m2` MAP<STRING NOT NULL, BIGINT>,\n                      |  `m3` MAP<STRING, BIGINT NOT NULL>,\n                      |  `m4` MAP<STRING NOT NULL, BIGINT NOT NULL>\n                      |) WITH (\n                      |  'connector' = 'values',\n                      |  'sink-insert-only' = 'true'\n                      |)\n                      |")).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |insert into zm_test(`person`, `votes`)\n                    |  select\n                    |    `person`,\n                    |    `votes`\n                    |  from\n                    |    src\n                    |")).stripMargin()).await();
        Assertions.assertThat(JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("zm_test")).sorted(Ordering$String$.MODULE$)).isEqualTo(new $colon.colon("+I[jason, 1, null, null, null, null]", new $colon.colon("+I[jason, 1, null, null, null, null]", new $colon.colon("+I[jason, 1, null, null, null, null]", new $colon.colon("+I[jason, 1, null, null, null, null]", Nil$.MODULE$)))).sorted(Ordering$String$.MODULE$));
    }

    @TestTemplate
    public void testCreateTableAsSelect() {
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE MyCtasTable\n                    | WITH (\n                    |   'connector' = 'values',\n                    |   'sink-insert-only' = 'true'\n                    |) AS\n                    |  SELECT\n                    |    `person`,\n                    |    `votes`\n                    |  FROM\n                    |    src\n                    |")).stripMargin()).await();
        List<String> resultsAsStrings = TestValuesTableFactory.getResultsAsStrings("MyCtasTable");
        $colon.colon colonVar = new $colon.colon("+I[jason, 1]", new $colon.colon("+I[jason, 1]", new $colon.colon("+I[jason, 1]", new $colon.colon("+I[jason, 1]", Nil$.MODULE$))));
        Assertions.assertThat(JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(resultsAsStrings).sorted(Ordering$String$.MODULE$)).isEqualTo(colonVar.sorted(Ordering$String$.MODULE$));
        StreamStatementSet createStatementSet = tEnv().createStatementSet();
        createStatementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TABLE MyCtasTableUseStatement\n                                | WITH (\n                                |   'connector' = 'values',\n                                |   'sink-insert-only' = 'true'\n                                |) AS\n                                |  SELECT\n                                |    `person`,\n                                |    `votes`\n                                |  FROM\n                                |    src\n                                |")).stripMargin());
        createStatementSet.execute().await();
        Assertions.assertThat(JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("MyCtasTableUseStatement")).sorted(Ordering$String$.MODULE$)).isEqualTo(colonVar.sorted(Ordering$String$.MODULE$));
    }

    @TestTemplate
    public void testCreateTableAsSelectWithSortLimit() {
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE MyCtasTable\n                    | WITH (\n                    |   'connector' = 'values',\n                    |   'sink-insert-only' = 'false'\n                    |) AS\n                    |  (SELECT\n                    |    `person`,\n                    |    `votes`\n                    |  FROM\n                    |    src order by `votes` LIMIT 2)\n                    |")).stripMargin()).await();
        Assertions.assertThat(JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("MyCtasTable")).sorted(Ordering$String$.MODULE$)).isEqualTo(new $colon.colon("+I[jason, 1]", new $colon.colon("+I[jason, 1]", Nil$.MODULE$)).sorted(Ordering$String$.MODULE$));
    }

    @TestTemplate
    public void testCreateTableAsSelectWithoutOptions() {
        Assertions.assertThatThrownBy(() -> {
            this.tEnv().executeSql("CREATE TABLE MyCtasTable AS SELECT `person`, `votes` FROM src");
        }).hasMessage("You should enable the checkpointing for sinking to managed table 'default_catalog.default_database.MyCtasTable', managed table relies on checkpoint to commit and the data is visible only after commit.");
    }

    @TestTemplate
    public void testPartialInsert() {
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(416).append("\n                       |CREATE TABLE test_source (\n                       |  id bigint,\n                       |  person String,\n                       |  votes bigint,\n                       |  city String,\n                       |  age int)\n                       |WITH (\n                       |  'connector' = 'values',\n                       |  'data-id' = '").append(TestValuesTableFactory.registerData((Seq<Row>) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Row[]{BatchTestBase$.MODULE$.row(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(1L), "jason", BoxesRunTime.boxToLong(3L), "X", BoxesRunTime.boxToInteger(43)})), BatchTestBase$.MODULE$.row(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(2L), "andy", BoxesRunTime.boxToLong(2L), "Y", BoxesRunTime.boxToInteger(32)})), BatchTestBase$.MODULE$.row(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(3L), "clark", BoxesRunTime.boxToLong(1L), "Z", BoxesRunTime.boxToInteger(29)}))})))).append("'\n                       |)\n                       |").toString())).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                      |CREATE TABLE test_sink (\n                      |  id bigint,\n                      |  person String,\n                      |  votes bigint,\n                      |  city String,\n                      |  age int,\n                      |  primary key(id) not enforced\n                      |) WITH (\n                      |  'connector' = 'values',\n                      |  'sink-insert-only' = 'false'\n                      |)\n                      |")).stripMargin());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |insert into test_sink (id, person, votes)\n                    |  select\n                    |    id,\n                    |    person,\n                    |    votes\n                    |  from\n                    |    test_source\n                    |")).stripMargin()).await();
        Assertions.assertThat(JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("test_sink")).sorted(Ordering$String$.MODULE$)).isEqualTo(new $colon.colon("+I[1, jason, 3, null, null]", new $colon.colon("+I[2, andy, 2, null, null]", new $colon.colon("+I[3, clark, 1, null, null]", Nil$.MODULE$))).sorted(Ordering$String$.MODULE$));
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |insert into test_sink (id, city, age)\n                    |  select\n                    |    id,\n                    |    city,\n                    |    age \n                    |  from\n                    |    test_source\n                    |")).stripMargin()).await();
        Assertions.assertThat(JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("test_sink")).sorted(Ordering$String$.MODULE$)).isEqualTo(new $colon.colon("+I[1, jason, 3, X, 43]", new $colon.colon("+I[2, andy, 2, Y, 32]", new $colon.colon("+I[3, clark, 1, Z, 29]", Nil$.MODULE$))).sorted(Ordering$String$.MODULE$));
    }

    public TableSinkITCase(StreamingWithStateTestBase.StateBackendMode stateBackendMode) {
        super(stateBackendMode);
    }
}
