/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.stream.sql;

import java.io.Serializable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment$;
import org.apache.flink.table.api.TableEnvironment$;
import org.apache.flink.table.api.Types$;
import org.apache.flink.table.api.scala.StreamTableEnvironment;
import org.apache.flink.table.api.scala.package$;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.runtime.stream.table.RowCollector$;
import org.apache.flink.table.runtime.stream.table.TestRetractSink;
import org.apache.flink.table.runtime.stream.table.TestUpsertSink;
import org.apache.flink.table.runtime.utils.StreamTestData$;
import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.utils.MemoryTableSourceSinkUtil;
import org.apache.flink.table.utils.MemoryTableSourceSinkUtil$;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Test;
import scala.Function1;
import scala.Predef$;
import scala.Symbol;
import scala.Tuple3;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.SymbolLiteral;

@ScalaSignature(bytes="\u0006\u0001\t3A!\u0001\u0002\u0001#\t\u0001\u0012J\\:feRLe\u000e^8J)\u000e\u000b7/\u001a\u0006\u0003\u0007\u0011\t1a]9m\u0015\t)a!\u0001\u0004tiJ,\u0017-\u001c\u0006\u0003\u000f!\tqA];oi&lWM\u0003\u0002\n\u0015\u0005)A/\u00192mK*\u00111\u0002D\u0001\u0006M2Lgn\u001b\u0006\u0003\u001b9\ta!\u00199bG\",'\"A\b\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0005\u0001\u0011\u0002CA\n\u0017\u001b\u0005!\"BA\u000b\u0007\u0003\u0015)H/\u001b7t\u0013\t9BC\u0001\u000eTiJ,\u0017-\\5oO^KG\u000f[*uCR,G+Z:u\u0005\u0006\u001cX\rC\u0003\u001a\u0001\u0011\u0005!$\u0001\u0004=S:LGO\u0010\u000b\u00027A\u0011A\u0004A\u0007\u0002\u0005!)a\u0004\u0001C\u0001?\u0005)C/Z:u\u0013:\u001cXM\u001d;J]R|\u0017\t\u001d9f]\u0012\u001cFO]3b[R{G+\u00192mKNKgn\u001b\u000b\u0002AA\u0011\u0011\u0005J\u0007\u0002E)\t1%A\u0003tG\u0006d\u0017-\u0003\u0002&E\t!QK\\5uQ\tir\u0005\u0005\u0002)W5\t\u0011F\u0003\u0002+\u001d\u0005)!.\u001e8ji&\u0011A&\u000b\u0002\u0005)\u0016\u001cH\u000fC\u0003/\u0001\u0011\u0005q$\u0001\u0015uKN$\u0018J\\:feRLe\u000e^8Va\u0012\fG/\u001b8h)\u0006\u0014G.\u001a+p%\u0016$(/Y2u'&t7\u000e\u000b\u0002.O!)\u0011\u0007\u0001C\u0001?\u00051C/Z:u\u0013:\u001cXM\u001d;J]R|\u0017\t\u001d9f]\u0012$\u0016M\u00197f)>\u0014V\r\u001e:bGR\u001c\u0016N\\6)\u0005A:\u0003\"\u0002\u001b\u0001\t\u0003y\u0012A\r;fgRLen]3si&sGo\\+qI\u0006$\u0018N\\4UC\ndWmV5uQ\u001a+H\u000e\\&fsR{W\u000b]:feR\u001c\u0016N\\6)\u0005M:\u0003\"B\u001c\u0001\t\u0003y\u0012\u0001\u000e;fgRLen]3si&sGo\\!qa\u0016tG-\u001b8h)\u0006\u0014G.Z,ji\"4U\u000f\u001c7LKf\fDk\\+qg\u0016\u0014HoU5oW\"\u0012ag\n\u0005\u0006u\u0001!\taH\u00015i\u0016\u001cH/\u00138tKJ$\u0018J\u001c;p\u0003B\u0004XM\u001c3j]\u001e$\u0016M\u00197f/&$\bNR;mY.+\u0017P\r+p+B\u001cXM\u001d;TS:\\\u0007FA\u001d(\u0011\u0015i\u0004\u0001\"\u0001 \u0003]\"Xm\u001d;J]N,'\u000f^%oi>\f\u0005\u000f]3oI&tw\rV1cY\u0016<\u0016\u000e\u001e5pkR4U\u000f\u001c7LKf\fDk\\+qg\u0016\u0014HoU5oW\"\u0012Ah\n\u0005\u0006\u0001\u0002!\taH\u00018i\u0016\u001cH/\u00138tKJ$\u0018J\u001c;p\u0003B\u0004XM\u001c3j]\u001e$\u0016M\u00197f/&$\bn\\;u\rVdGnS3zeQ{W\u000b]:feR\u001c\u0016N\\6)\u0005}:\u0003")
public class InsertIntoITCase
extends StreamingWithStateTestBase {
    @Test
    public void testInsertIntoAppendStreamToTableSink() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.getConfig().enableObjectReuse();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = TableEnvironment$.MODULE$.getTableEnvironment(env);
        MemoryTableSourceSinkUtil$.MODULE$.clear();
        DataStream input = StreamTestData$.MODULE$.get3TupleDataStream(env).assignAscendingTimestamps((Function1 & Serializable & scala.Serializable)r -> BoxesRunTime.boxToLong((long)InsertIntoITCase.$anonfun$testInsertIntoAppendStreamToTableSink$1(r)));
        tEnv.registerDataStream("sourceTable", input, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "t")).rowtime()}));
        String[] fieldNames = (String[])((Object[])new String[]{"d", "e", "t"});
        TypeInformation[] fieldTypes = (TypeInformation[])((Object[])new TypeInformation[]{Types$.MODULE$.STRING(), Types$.MODULE$.SQL_TIMESTAMP(), Types$.MODULE$.LONG()});
        MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink();
        tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, (TableSink)sink);
        tEnv.sqlUpdate(new StringOps(Predef$.MODULE$.augmentString("INSERT INTO targetTable\n         |SELECT c, t, b\n         |FROM sourceTable\n         |WHERE a < 3 OR a > 19\n       ")).stripMargin());
        env.execute();
        String expected = ((TraversableOnce)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Hi,1970-01-01 00:00:00.001,1", "Hello,1970-01-01 00:00:00.002,2", "Comment#14,1970-01-01 00:00:00.006,6", "Comment#15,1970-01-01 00:00:00.006,6"}))).mkString("\n");
        TestBaseUtils.compareResultAsText((java.util.List)((java.util.List)JavaConverters$.MODULE$.bufferAsJavaListConverter(MemoryTableSourceSinkUtil$.MODULE$.tableData()).asJava()), (String)expected);
    }

    @Test
    public void testInsertIntoUpdatingTableToRetractSink() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.getConfig().enableObjectReuse();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = TableEnvironment$.MODULE$.getTableEnvironment(env);
        DataStream t = StreamTestData$.MODULE$.get3TupleDataStream(env).assignAscendingTimestamps((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToLong((long)InsertIntoITCase.$anonfun$testInsertIntoUpdatingTableToRetractSink$1(x$1)));
        tEnv.registerDataStream("sourceTable", t, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "id")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "num")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "text"))}));
        tEnv.registerTableSink("targetTable", (String[])((Object[])new String[]{"len", "cntid", "sumnum"}), (TypeInformation[])((Object[])new TypeInformation[]{Types$.MODULE$.INT(), Types$.MODULE$.LONG(), Types$.MODULE$.LONG()}), (TableSink)new TestRetractSink());
        tEnv.sqlUpdate(new StringOps(Predef$.MODULE$.augmentString("INSERT INTO targetTable\n         |SELECT len, COUNT(id) AS cntid, SUM(num) AS sumnum\n         |FROM (SELECT id, num, CHAR_LENGTH(text) AS len FROM sourceTable)\n         |GROUP BY len\n       ")).stripMargin());
        env.execute();
        List<Tuple2<Boolean, Row>> results = RowCollector$.MODULE$.getAndClearValues();
        List retracted = (List)RowCollector$.MODULE$.retractResults(results).sorted((Ordering)Ordering.String$.MODULE$);
        List expected = (List)new .colon.colon((Object)"2,1,1", (List)new .colon.colon((Object)"5,1,2", (List)new .colon.colon((Object)"11,1,2", (List)new .colon.colon((Object)"25,1,3", (List)new .colon.colon((Object)"10,7,39", (List)new .colon.colon((Object)"14,1,3", (List)new .colon.colon((Object)"9,9,41", (List)Nil$.MODULE$))))))).sorted((Ordering)Ordering.String$.MODULE$);
        Assert.assertEquals((Object)expected, (Object)retracted);
    }

    @Test
    public void testInsertIntoAppendTableToRetractSink() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.getConfig().enableObjectReuse();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = TableEnvironment$.MODULE$.getTableEnvironment(env);
        DataStream t = StreamTestData$.MODULE$.get3TupleDataStream(env).assignAscendingTimestamps((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToLong((long)InsertIntoITCase.$anonfun$testInsertIntoAppendTableToRetractSink$1(x$2)));
        tEnv.registerDataStream("sourceTable", t, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "id")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "num")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "text")), package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}));
        tEnv.registerTableSink("targetTable", (String[])((Object[])new String[]{"wend", "cntid", "sumnum"}), (TypeInformation[])((Object[])new TypeInformation[]{Types$.MODULE$.SQL_TIMESTAMP(), Types$.MODULE$.LONG(), Types$.MODULE$.LONG()}), (TableSink)new TestRetractSink());
        tEnv.sqlUpdate(new StringOps(Predef$.MODULE$.augmentString("INSERT INTO targetTable\n         |SELECT\n         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,\n         |  COUNT(id) AS cntid,\n         |  SUM(num) AS sumnum\n         |FROM sourceTable\n         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND)\n       ")).stripMargin());
        env.execute();
        List<Tuple2<Boolean, Row>> results = RowCollector$.MODULE$.getAndClearValues();
        Assert.assertFalse((String)"Received retraction messages for append only table", (boolean)results.exists((Function1 & Serializable & scala.Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)InsertIntoITCase.$anonfun$testInsertIntoAppendTableToRetractSink$2(x$3))));
        List retracted = (List)RowCollector$.MODULE$.retractResults(results).sorted((Ordering)Ordering.String$.MODULE$);
        List expected = (List)new .colon.colon((Object)"1970-01-01 00:00:00.005,4,8", (List)new .colon.colon((Object)"1970-01-01 00:00:00.01,5,18", (List)new .colon.colon((Object)"1970-01-01 00:00:00.015,5,24", (List)new .colon.colon((Object)"1970-01-01 00:00:00.02,5,29", (List)new .colon.colon((Object)"1970-01-01 00:00:00.025,2,12", (List)Nil$.MODULE$))))).sorted((Ordering)Ordering.String$.MODULE$);
        Assert.assertEquals((Object)expected, (Object)retracted);
    }

    @Test
    public void testInsertIntoUpdatingTableWithFullKeyToUpsertSink() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.getConfig().enableObjectReuse();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = TableEnvironment$.MODULE$.getTableEnvironment(env);
        DataStream t = StreamTestData$.MODULE$.get3TupleDataStream(env).assignAscendingTimestamps((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToLong((long)InsertIntoITCase.$anonfun$testInsertIntoUpdatingTableWithFullKeyToUpsertSink$1(x$4)));
        tEnv.registerDataStream("sourceTable", t, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "id")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "num")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "text"))}));
        tEnv.registerTableSink("targetTable", (String[])((Object[])new String[]{"cnt", "cntid", "cTrue"}), (TypeInformation[])((Object[])new TypeInformation[]{Types$.MODULE$.LONG(), Types$.MODULE$.LONG(), Types$.MODULE$.BOOLEAN()}), (TableSink)new TestUpsertSink((String[])((Object[])new String[]{"cnt", "cTrue"}), false));
        tEnv.sqlUpdate(new StringOps(Predef$.MODULE$.augmentString("INSERT INTO targetTable\n         |SELECT cnt, COUNT(len) AS cntid, cTrue\n         |FROM\n         |  (SELECT CHAR_LENGTH(text) AS len, (id > 0) AS cTrue, COUNT(id) AS cnt\n         |   FROM sourceTable\n         |   GROUP BY CHAR_LENGTH(text), (id > 0)\n         |   )\n         |GROUP BY cnt, cTrue\n       ")).stripMargin());
        env.execute();
        List<Tuple2<Boolean, Row>> results = RowCollector$.MODULE$.getAndClearValues();
        Assert.assertTrue((String)"Results must include delete messages", (boolean)results.exists((Function1 & Serializable & scala.Serializable)x$5 -> BoxesRunTime.boxToBoolean((boolean)InsertIntoITCase.$anonfun$testInsertIntoUpdatingTableWithFullKeyToUpsertSink$2(x$5))));
        List retracted = (List)RowCollector$.MODULE$.upsertResults(results, new int[]{0, 2}).sorted((Ordering)Ordering.String$.MODULE$);
        List expected = (List)new .colon.colon((Object)"1,5,true", (List)new .colon.colon((Object)"7,1,true", (List)new .colon.colon((Object)"9,1,true", (List)Nil$.MODULE$))).sorted((Ordering)Ordering.String$.MODULE$);
        Assert.assertEquals((Object)expected, (Object)retracted);
    }

    @Test
    public void testInsertIntoAppendingTableWithFullKey1ToUpsertSink() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.getConfig().enableObjectReuse();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = TableEnvironment$.MODULE$.getTableEnvironment(env);
        DataStream t = StreamTestData$.MODULE$.get3TupleDataStream(env).assignAscendingTimestamps((Function1 & Serializable & scala.Serializable)x$6 -> BoxesRunTime.boxToLong((long)InsertIntoITCase.$anonfun$testInsertIntoAppendingTableWithFullKey1ToUpsertSink$1(x$6)));
        tEnv.registerDataStream("sourceTable", t, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "id")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "num")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "text")), package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}));
        tEnv.registerTableSink("targetTable", (String[])((Object[])new String[]{"num", "wend", "cntid"}), (TypeInformation[])((Object[])new TypeInformation[]{Types$.MODULE$.LONG(), Types$.MODULE$.SQL_TIMESTAMP(), Types$.MODULE$.LONG()}), (TableSink)new TestUpsertSink((String[])((Object[])new String[]{"wend", "num"}), true));
        tEnv.sqlUpdate(new StringOps(Predef$.MODULE$.augmentString("INSERT INTO targetTable\n         |SELECT\n         |  num,\n         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,\n         |  COUNT(id) AS cntid\n         |FROM sourceTable\n         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num\n       ")).stripMargin());
        env.execute();
        List<Tuple2<Boolean, Row>> results = RowCollector$.MODULE$.getAndClearValues();
        Assert.assertFalse((String)"Received retraction messages for append only table", (boolean)results.exists((Function1 & Serializable & scala.Serializable)x$7 -> BoxesRunTime.boxToBoolean((boolean)InsertIntoITCase.$anonfun$testInsertIntoAppendingTableWithFullKey1ToUpsertSink$2(x$7))));
        List retracted = (List)RowCollector$.MODULE$.upsertResults(results, new int[]{0, 1}).sorted((Ordering)Ordering.String$.MODULE$);
        List expected = (List)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,1970-01-01 00:00:00.005,1", "2,1970-01-01 00:00:00.005,2", "3,1970-01-01 00:00:00.005,1", "3,1970-01-01 00:00:00.01,2", "4,1970-01-01 00:00:00.01,3", "4,1970-01-01 00:00:00.015,1", "5,1970-01-01 00:00:00.015,4", "5,1970-01-01 00:00:00.02,1", "6,1970-01-01 00:00:00.02,4", "6,1970-01-01 00:00:00.025,2"})).sorted((Ordering)Ordering.String$.MODULE$);
        Assert.assertEquals((Object)expected, (Object)retracted);
    }

    @Test
    public void testInsertIntoAppendingTableWithFullKey2ToUpsertSink() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.getConfig().enableObjectReuse();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = TableEnvironment$.MODULE$.getTableEnvironment(env);
        DataStream t = StreamTestData$.MODULE$.get3TupleDataStream(env).assignAscendingTimestamps((Function1 & Serializable & scala.Serializable)x$8 -> BoxesRunTime.boxToLong((long)InsertIntoITCase.$anonfun$testInsertIntoAppendingTableWithFullKey2ToUpsertSink$1(x$8)));
        tEnv.registerDataStream("sourceTable", t, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "id")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "num")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "text")), package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}));
        tEnv.registerTableSink("targetTable", (String[])((Object[])new String[]{"wstart", "wend", "num", "cntid"}), (TypeInformation[])((Object[])new TypeInformation[]{Types$.MODULE$.SQL_TIMESTAMP(), Types$.MODULE$.SQL_TIMESTAMP(), Types$.MODULE$.LONG(), Types$.MODULE$.LONG()}), (TableSink)new TestUpsertSink((String[])((Object[])new String[]{"wstart", "wend", "num"}), true));
        tEnv.sqlUpdate(new StringOps(Predef$.MODULE$.augmentString("INSERT INTO targetTable\n         |SELECT\n         |  TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) AS wstart,\n         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,\n         |  num,\n         |  COUNT(id) AS cntid\n         |FROM sourceTable\n         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num\n       ")).stripMargin());
        env.execute();
        List<Tuple2<Boolean, Row>> results = RowCollector$.MODULE$.getAndClearValues();
        Assert.assertFalse((String)"Received retraction messages for append only table", (boolean)results.exists((Function1 & Serializable & scala.Serializable)x$9 -> BoxesRunTime.boxToBoolean((boolean)InsertIntoITCase.$anonfun$testInsertIntoAppendingTableWithFullKey2ToUpsertSink$2(x$9))));
        List retracted = (List)RowCollector$.MODULE$.upsertResults(results, new int[]{0, 1, 2}).sorted((Ordering)Ordering.String$.MODULE$);
        List expected = (List)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1,1", "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,2,2", "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,3,1", "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,3,2", "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,4,3", "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,4,1", "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,5,4", "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,5,1", "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,6,4", "1970-01-01 00:00:00.02,1970-01-01 00:00:00.025,6,2"})).sorted((Ordering)Ordering.String$.MODULE$);
        Assert.assertEquals((Object)expected, (Object)retracted);
    }

    @Test
    public void testInsertIntoAppendingTableWithoutFullKey1ToUpsertSink() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.getConfig().enableObjectReuse();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = TableEnvironment$.MODULE$.getTableEnvironment(env);
        DataStream t = StreamTestData$.MODULE$.get3TupleDataStream(env).assignAscendingTimestamps((Function1 & Serializable & scala.Serializable)x$10 -> BoxesRunTime.boxToLong((long)InsertIntoITCase.$anonfun$testInsertIntoAppendingTableWithoutFullKey1ToUpsertSink$1(x$10)));
        tEnv.registerDataStream("sourceTable", t, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "id")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "num")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "text")), package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}));
        tEnv.registerTableSink("targetTable", (String[])((Object[])new String[]{"wend", "cntid"}), (TypeInformation[])((Object[])new TypeInformation[]{Types$.MODULE$.SQL_TIMESTAMP(), Types$.MODULE$.LONG()}), (TableSink)new TestUpsertSink(null, true));
        tEnv.sqlUpdate(new StringOps(Predef$.MODULE$.augmentString("INSERT INTO targetTable\n         |SELECT\n         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,\n         |  COUNT(id) AS cntid\n         |FROM sourceTable\n         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num\n       ")).stripMargin());
        env.execute();
        List<Tuple2<Boolean, Row>> results = RowCollector$.MODULE$.getAndClearValues();
        Assert.assertFalse((String)"Received retraction messages for append only table", (boolean)results.exists((Function1 & Serializable & scala.Serializable)x$11 -> BoxesRunTime.boxToBoolean((boolean)InsertIntoITCase.$anonfun$testInsertIntoAppendingTableWithoutFullKey1ToUpsertSink$2(x$11))));
        List retracted = (List)((SeqLike)results.map((Function1 & Serializable & scala.Serializable)x$12 -> ((Row)x$12.f1).toString(), List$.MODULE$.canBuildFrom())).sorted((Ordering)Ordering.String$.MODULE$);
        List expected = (List)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1970-01-01 00:00:00.005,1", "1970-01-01 00:00:00.005,2", "1970-01-01 00:00:00.005,1", "1970-01-01 00:00:00.01,2", "1970-01-01 00:00:00.01,3", "1970-01-01 00:00:00.015,1", "1970-01-01 00:00:00.015,4", "1970-01-01 00:00:00.02,1", "1970-01-01 00:00:00.02,4", "1970-01-01 00:00:00.025,2"})).sorted((Ordering)Ordering.String$.MODULE$);
        Assert.assertEquals((Object)expected, (Object)retracted);
    }

    @Test
    public void testInsertIntoAppendingTableWithoutFullKey2ToUpsertSink() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.getConfig().enableObjectReuse();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = TableEnvironment$.MODULE$.getTableEnvironment(env);
        DataStream t = StreamTestData$.MODULE$.get3TupleDataStream(env).assignAscendingTimestamps((Function1 & Serializable & scala.Serializable)x$13 -> BoxesRunTime.boxToLong((long)InsertIntoITCase.$anonfun$testInsertIntoAppendingTableWithoutFullKey2ToUpsertSink$1(x$13)));
        tEnv.registerDataStream("sourceTable", t, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "id")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "num")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "text")), package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}));
        tEnv.registerTableSink("targetTable", (String[])((Object[])new String[]{"num", "cntid"}), (TypeInformation[])((Object[])new TypeInformation[]{Types$.MODULE$.LONG(), Types$.MODULE$.LONG()}), (TableSink)new TestUpsertSink(null, true));
        tEnv.sqlUpdate(new StringOps(Predef$.MODULE$.augmentString("INSERT INTO targetTable\n         |SELECT\n         |  num,\n         |  COUNT(id) AS cntid\n         |FROM sourceTable\n         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num\n       ")).stripMargin());
        env.execute();
        List<Tuple2<Boolean, Row>> results = RowCollector$.MODULE$.getAndClearValues();
        Assert.assertFalse((String)"Received retraction messages for append only table", (boolean)results.exists((Function1 & Serializable & scala.Serializable)x$14 -> BoxesRunTime.boxToBoolean((boolean)InsertIntoITCase.$anonfun$testInsertIntoAppendingTableWithoutFullKey2ToUpsertSink$2(x$14))));
        List retracted = (List)((SeqLike)results.map((Function1 & Serializable & scala.Serializable)x$15 -> ((Row)x$15.f1).toString(), List$.MODULE$.canBuildFrom())).sorted((Ordering)Ordering.String$.MODULE$);
        List expected = (List)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,1", "2,2", "3,1", "3,2", "4,3", "4,1", "5,4", "5,1", "6,4", "6,2"})).sorted((Ordering)Ordering.String$.MODULE$);
        Assert.assertEquals((Object)expected, (Object)retracted);
    }

    public static final /* synthetic */ long $anonfun$testInsertIntoAppendStreamToTableSink$1(Tuple3 r) {
        return BoxesRunTime.unboxToLong((Object)r._2());
    }

    public static final /* synthetic */ long $anonfun$testInsertIntoUpdatingTableToRetractSink$1(Tuple3 x$1) {
        return BoxesRunTime.unboxToInt((Object)x$1._1());
    }

    public static final /* synthetic */ long $anonfun$testInsertIntoAppendTableToRetractSink$1(Tuple3 x$2) {
        return BoxesRunTime.unboxToInt((Object)x$2._1());
    }

    public static final /* synthetic */ boolean $anonfun$testInsertIntoAppendTableToRetractSink$2(Tuple2 x$3) {
        return !Predef$.MODULE$.Boolean2boolean((Boolean)x$3.f0);
    }

    public static final /* synthetic */ long $anonfun$testInsertIntoUpdatingTableWithFullKeyToUpsertSink$1(Tuple3 x$4) {
        return BoxesRunTime.unboxToInt((Object)x$4._1());
    }

    public static final /* synthetic */ boolean $anonfun$testInsertIntoUpdatingTableWithFullKeyToUpsertSink$2(Tuple2 x$5) {
        return BoxesRunTime.equals((Object)x$5.f0, (Object)BoxesRunTime.boxToBoolean((boolean)false));
    }

    public static final /* synthetic */ long $anonfun$testInsertIntoAppendingTableWithFullKey1ToUpsertSink$1(Tuple3 x$6) {
        return BoxesRunTime.unboxToInt((Object)x$6._1());
    }

    public static final /* synthetic */ boolean $anonfun$testInsertIntoAppendingTableWithFullKey1ToUpsertSink$2(Tuple2 x$7) {
        return !Predef$.MODULE$.Boolean2boolean((Boolean)x$7.f0);
    }

    public static final /* synthetic */ long $anonfun$testInsertIntoAppendingTableWithFullKey2ToUpsertSink$1(Tuple3 x$8) {
        return BoxesRunTime.unboxToInt((Object)x$8._1());
    }

    public static final /* synthetic */ boolean $anonfun$testInsertIntoAppendingTableWithFullKey2ToUpsertSink$2(Tuple2 x$9) {
        return !Predef$.MODULE$.Boolean2boolean((Boolean)x$9.f0);
    }

    public static final /* synthetic */ long $anonfun$testInsertIntoAppendingTableWithoutFullKey1ToUpsertSink$1(Tuple3 x$10) {
        return BoxesRunTime.unboxToInt((Object)x$10._1());
    }

    public static final /* synthetic */ boolean $anonfun$testInsertIntoAppendingTableWithoutFullKey1ToUpsertSink$2(Tuple2 x$11) {
        return !Predef$.MODULE$.Boolean2boolean((Boolean)x$11.f0);
    }

    public static final /* synthetic */ long $anonfun$testInsertIntoAppendingTableWithoutFullKey2ToUpsertSink$1(Tuple3 x$13) {
        return BoxesRunTime.unboxToInt((Object)x$13._1());
    }

    public static final /* synthetic */ boolean $anonfun$testInsertIntoAppendingTableWithoutFullKey2ToUpsertSink$2(Tuple2 x$14) {
        return !Predef$.MODULE$.Boolean2boolean((Boolean)x$14.f0);
    }
}

