package org.apache.flink.table.planner.runtime.stream.sql;

import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.table.planner.runtime.utils.TestingRetractSink;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Predef$;
import scala.Symbol;
import scala.Symbol$;
import scala.Tuple3;
import scala.Tuple4;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.MutableList;
import scala.collection.mutable.MutableList$;
import scala.collection.mutable.StringBuilder;
import scala.math.Ordering$String$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: IntervalJoinITCase.scala */
@RunWith(Parameterized.class)
@ScalaSignature(bytes = "\u0006\u0001\u0005}a\u0001B\u0001\u0003\u0001M\u0011!#\u00138uKJ4\u0018\r\u001c&pS:LEkQ1tK*\u00111\u0001B\u0001\u0004gFd'BA\u0003\u0007\u0003\u0019\u0019HO]3b[*\u0011q\u0001C\u0001\beVtG/[7f\u0015\tI!\"A\u0004qY\u0006tg.\u001a:\u000b\u0005-a\u0011!\u0002;bE2,'BA\u0007\u000f\u0003\u00151G.\u001b8l\u0015\ty\u0001#\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002#\u0005\u0019qN]4\u0004\u0001M\u0011\u0001\u0001\u0006\t\u0003+ai\u0011A\u0006\u0006\u0003/\u0019\tQ!\u001e;jYNL!!\u0007\f\u00035M#(/Z1nS:<w+\u001b;i'R\fG/\u001a+fgR\u0014\u0015m]3\t\u0011m\u0001!\u0011!Q\u0001\nq\tA!\\8eKB\u0011Q$\r\b\u0003==r!a\b\u0018\u000f\u0005\u0001jcBA\u0011-\u001d\t\u00113F\u0004\u0002$U9\u0011A%\u000b\b\u0003K!j\u0011A\n\u0006\u0003OI\ta\u0001\u0010:p_Rt\u0014\"A\t\n\u0005=\u0001\u0012BA\u0007\u000f\u0013\tYA\"\u0003\u0002\n\u0015%\u0011q\u0001C\u0005\u0003/\u0019I!\u0001\r\f\u00025M#(/Z1nS:<w+\u001b;i'R\fG/\u001a+fgR\u0014\u0015m]3\n\u0005I\u001a$\u0001E*uCR,')Y2lK:$Wj\u001c3f\u0015\t\u0001d\u0003C\u00036\u0001\u0011\u0005a'\u0001\u0004=S:LGO\u0010\u000b\u0003oe\u0002\"\u0001\u000f\u0001\u000e\u0003\tAQa\u0007\u001bA\u0002qAQa\u000f\u0001\u0005\u0002q\n\u0001\u0004^3tiB\u0013xnY3tgRKW.Z%o]\u0016\u0014(j\\5o)\u0005i\u0004C\u0001 B\u001b\u0005y$\"\u0001!\u0002\u000bM\u001c\u0017\r\\1\n\u0005\t{$\u0001B+oSRD#A\u000f#\u0011\u0005\u0015CU\"\u0001$\u000b\u0005\u001d\u0003\u0012!\u00026v]&$\u0018BA%G\u0005\u0011!Vm\u001d;\t\u000b-\u0003A\u0011\u0001\u001f\u0002WQ,7\u000f\u001e)s_\u000e,7o\u001d+j[\u0016LeN\\3s\u0015>LgnV5uQ>#\b.\u001a:D_:$\u0017\u000e^5p]ND#A\u0013#\t\u000b9\u0003A\u0011\u0001\u001f\u0002)Q,7\u000f\u001e*poRKW.Z%o]\u0016\u0014(j\\5oQ\tiE\tC\u0003R\u0001\u0011\u0005A(A\u0015uKN$(k\\<US6,\u0017J\u001c8fe*{\u0017N\\,ji\"|W\u000f^#rk\u0006d7i\u001c8eSRLwN\u001c\u0015\u0003!\u0012CQ\u0001\u0016\u0001\u0005\u0002q\nQ\u0005^3tiVs'm\\;oI\u0016$\u0017iZ4BMR,'OU8xi&lW-\u00138oKJTu.\u001b8)\u0005M#\u0005\"B,\u0001\t\u0003a\u0014!\n;fgR\u0014vn\u001e+j[\u0016LeN\\3s\u0015>LgnV5uQ\u0016\u000bX/\u001b+j[\u0016\fE\u000f\u001e:tQ\t1F\tC\u0003[\u0001\u0011\u0005A(A\u0014uKN$(k\\<US6,\u0017J\u001c8fe*{\u0017N\\,ji\"|E\u000f[3s\u0007>tG-\u001b;j_:\u001c\bFA-E\u0011\u0015i\u0006\u0001\"\u0001=\u0003)\"Xm\u001d;S_^$\u0016.\\3J]:,'OS8j]^KG\u000f[(uQ\u0016\u0014H+[7f\u0007>tG-\u001b;j_:D#\u0001\u0018#\t\u000b\u0001\u0004A\u0011\u0001\u001f\u0002eQ,7\u000f\u001e*poRKW.Z%o]\u0016\u0014(j\\5o/&$\bnV5oI><\u0018iZ4sK\u001e\fG/Z(o\r&\u00148\u000f\u001e+j[\u0016D#a\u0018#\t\u000b\r\u0004A\u0011\u0001\u001f\u0002gQ,7\u000f\u001e*poRKW.Z%o]\u0016\u0014(j\\5o/&$\bnV5oI><\u0018iZ4sK\u001e\fG/Z(o'\u0016\u001cwN\u001c3US6,\u0007F\u00012E\u0011\u00151\u0007\u0001\"\u0001=\u0003e!Xm\u001d;Qe>\u001cG+[7f\u0019\u00164GoT;uKJTu.\u001b8)\u0005\u0015$\u0005\"B5\u0001\t\u0003a\u0014\u0001\u0007;fgR\u0014vn\u001e+j[\u0016dUM\u001a;PkR,'OS8j]\"\u0012\u0001\u000e\u0012\u0005\u0006Y\u0002!\t\u0001P\u0001-i\u0016\u001cHOU8x)&lW\rT3gi>+H/\u001a:K_&tg*Z4bi&4X-\u00138uKJ4\u0018\r\\*ju\u0016D#a\u001b#\t\u000b=\u0004A\u0011\u0001\u001f\u00025Q,7\u000f\u001e)s_\u000e$\u0016.\\3SS\u001eDGoT;uKJTu.\u001b8)\u00059$\u0005\"\u0002:\u0001\t\u0003a\u0014!\u0007;fgR\u0014vn\u001e+j[\u0016\u0014\u0016n\u001a5u\u001fV$XM\u001d&pS:D#!\u001d#\t\u000bU\u0004A\u0011\u0001\u001f\u0002[Q,7\u000f\u001e*poRKW.\u001a*jO\"$x*\u001e;fe*{\u0017N\u001c(fO\u0006$\u0018N^3J]R,'O^1m'&TX\r\u000b\u0002u\t\")\u0001\u0010\u0001C\u0001y\u0005IB/Z:u!J|7\rV5nK\u001a+H\u000e\\(vi\u0016\u0014(j\\5oQ\t9H\tC\u0003|\u0001\u0011\u0005A(\u0001\ruKN$(k\\<US6,g)\u001e7m\u001fV$XM\u001d&pS:D#A\u001f#\t\u000by\u0004A\u0011\u0001\u001f\u0002YQ,7\u000f\u001e*poRKW.\u001a$vY2|U\u000f^3s\u0015>LgNT3hCRLg/Z%oi\u0016\u0014h/\u00197TSj,\u0007FA?EQ\u001d\u0001\u00111AA\b\u0003#\u0001B!!\u0002\u0002\f5\u0011\u0011q\u0001\u0006\u0004\u0003\u00131\u0015A\u0002:v]:,'/\u0003\u0003\u0002\u000e\u0005\u001d!a\u0002*v]^KG\u000f[\u0001\u0006m\u0006dW/Z\u0012\u0003\u0003'\u0001B!!\u0006\u0002\u001c5\u0011\u0011q\u0003\u0006\u0004\u000331\u0015a\u0002:v]:,'o]\u0005\u0005\u0003;\t9BA\u0007QCJ\fW.\u001a;fe&TX\r\u001a")
/* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.class */
public class IntervalJoinITCase extends StreamingWithStateTestBase {
    private static Symbol symbol$1 = Symbol$.MODULE$.apply("a");
    private static Symbol symbol$2 = Symbol$.MODULE$.apply("b");
    private static Symbol symbol$3 = Symbol$.MODULE$.apply("c");
    private static Symbol symbol$4 = Symbol$.MODULE$.apply("proctime");
    private static Symbol symbol$5 = Symbol$.MODULE$.apply("key");
    private static Symbol symbol$6 = Symbol$.MODULE$.apply("id");
    private static Symbol symbol$7 = Symbol$.MODULE$.apply("rowtime");
    private static Symbol symbol$8 = Symbol$.MODULE$.apply("tm");

    @Test
    public void testProcessTimeInnerJoin() {
        env().setParallelism(1);
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.a, t2.c, t1.c\n        |FROM T1 as t1 join T2 as t2 ON\n        |  t1.a = t2.a AND\n        |  t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND\n        |    t2.proctime + INTERVAL '5' SECOND\n        |")).stripMargin();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(1L), "Hi1"));
        mutableList.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(2L), "Hi2"));
        mutableList.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(5L), "Hi3"));
        mutableList.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(2), BoxesRunTime.boxToLong(7L), "Hi5"));
        mutableList.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(9L), "Hi6"));
        mutableList.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(8L), "Hi8"));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(1L), "HiHi"));
        mutableList2.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(2), BoxesRunTime.boxToLong(2L), "HeHe"));
        tEnv().registerTable("TmpT1", package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$39(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).proctime()})));
        tEnv().registerTable("T1", tEnv().sqlQuery("SELECT IF(a = 1, CAST(NULL AS INT), a) as a, b, c, proctime FROM TmpT1"));
        tEnv().registerTable("TmpT2", package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$40(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).proctime()})));
        tEnv().registerTable("T2", tEnv().sqlQuery("SELECT IF(a = 1, CAST(NULL AS INT), a) as a, b, c, proctime FROM TmpT2"));
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(new TestingAppendSink());
        env().execute();
    }

    @Test
    public void testProcessTimeInnerJoinWithOtherConditions() {
        env().setParallelism(2);
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.a, t2.c, t1.c\n        |FROM T1 as t1 JOIN T2 as t2 ON\n        |  t1.a = t2.a AND\n        |  t1.proctime BETWEEN t2.proctime - interval '5' SECOND AND\n        |    t2.proctime + interval '5' second AND\n        |  t1.b = t2.b\n        |")).stripMargin();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple3("1", BoxesRunTime.boxToLong(1L), "Hi1"));
        mutableList.$plus$eq(new Tuple3("1", BoxesRunTime.boxToLong(2L), "Hi2"));
        mutableList.$plus$eq(new Tuple3("1", BoxesRunTime.boxToLong(5L), "Hi3"));
        mutableList.$plus$eq(new Tuple3("2", BoxesRunTime.boxToLong(7L), "Hi5"));
        mutableList.$plus$eq(new Tuple3("1", BoxesRunTime.boxToLong(9L), "Hi6"));
        mutableList.$plus$eq(new Tuple3("1", BoxesRunTime.boxToLong(8L), "Hi8"));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple3("1", BoxesRunTime.boxToLong(5L), "HiHi"));
        mutableList2.$plus$eq(new Tuple3("2", BoxesRunTime.boxToLong(2L), "HeHe"));
        mutableList.$plus$eq(new Tuple3((Object) null, BoxesRunTime.boxToLong(20L), "leftNull"));
        mutableList2.$plus$eq(new Tuple3((Object) null, BoxesRunTime.boxToLong(20L), "rightNull"));
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$41(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).proctime()}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$42(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).proctime()}));
        tEnv().registerTable("T1", table);
        tEnv().registerTable("T2", table2);
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertFalse(testingAppendSink.getAppendResults().contains("null"));
    }

    @Test
    public void testRowTimeInnerJoin() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.key, t2.id, t1.id\n        |FROM T1 as t1 join T2 as t2 ON\n        |  t1.key = t2.key AND\n        |  t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND\n        |    t2.rowtime + INTERVAL '6' SECOND\n        |")).stripMargin();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple3("A", "LEFT0.999", BoxesRunTime.boxToLong(999L)));
        mutableList.$plus$eq(new Tuple3("A", "LEFT1", BoxesRunTime.boxToLong(1000L)));
        mutableList.$plus$eq(new Tuple3("A", "LEFT2", BoxesRunTime.boxToLong(2000L)));
        mutableList.$plus$eq(new Tuple3("A", "LEFT3", BoxesRunTime.boxToLong(3000L)));
        mutableList.$plus$eq(new Tuple3("B", "LEFT4", BoxesRunTime.boxToLong(4000L)));
        mutableList.$plus$eq(new Tuple3("A", "LEFT5", BoxesRunTime.boxToLong(5000L)));
        mutableList.$plus$eq(new Tuple3("A", "LEFT6", BoxesRunTime.boxToLong(6000L)));
        mutableList.$plus$eq(new Tuple3((Object) null, "LEFT8", BoxesRunTime.boxToLong(8000L)));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple3("A", "RIGHT6", BoxesRunTime.boxToLong(6000L)));
        mutableList2.$plus$eq(new Tuple3("B", "RIGHT7", BoxesRunTime.boxToLong(7000L)));
        mutableList2.$plus$eq(new Tuple3((Object) null, "RIGHT10", BoxesRunTime.boxToLong(10000L)));
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$43(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$44(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        tEnv().registerTable("T1", table);
        tEnv().registerTable("T2", table2);
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(MutableList$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"A,RIGHT6,LEFT1", "A,RIGHT6,LEFT2", "A,RIGHT6,LEFT3", "A,RIGHT6,LEFT5", "A,RIGHT6,LEFT6", "B,RIGHT7,LEFT4"})), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testRowTimeInnerJoinWithoutEqualCondition() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.key, t2.id, t1.id\n        |FROM T1 as t1 join T2 as t2 ON\n        |  t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND\n        |    t2.rowtime + INTERVAL '6' SECOND\n        |")).stripMargin();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple3("A", "LEFT0.999", BoxesRunTime.boxToLong(999L)));
        mutableList.$plus$eq(new Tuple3("A", "LEFT1", BoxesRunTime.boxToLong(1000L)));
        mutableList.$plus$eq(new Tuple3("A", "LEFT2", BoxesRunTime.boxToLong(2000L)));
        mutableList.$plus$eq(new Tuple3("A", "LEFT3", BoxesRunTime.boxToLong(3000L)));
        mutableList.$plus$eq(new Tuple3("B", "LEFT4", BoxesRunTime.boxToLong(4000L)));
        mutableList.$plus$eq(new Tuple3("A", "LEFT5", BoxesRunTime.boxToLong(5000L)));
        mutableList.$plus$eq(new Tuple3("A", "LEFT6", BoxesRunTime.boxToLong(6000L)));
        mutableList.$plus$eq(new Tuple3((Object) null, "LEFT8", BoxesRunTime.boxToLong(8000L)));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple3("A", "RIGHT6", BoxesRunTime.boxToLong(6000L)));
        mutableList2.$plus$eq(new Tuple3("B", "RIGHT7", BoxesRunTime.boxToLong(7000L)));
        mutableList2.$plus$eq(new Tuple3((Object) null, "RIGHT10", BoxesRunTime.boxToLong(10000L)));
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$45(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$46(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        tEnv().registerTable("T1", table);
        tEnv().registerTable("T2", table2);
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(MutableList$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"A,RIGHT6,LEFT1", "A,RIGHT6,LEFT2", "A,RIGHT6,LEFT3", "A,RIGHT6,LEFT4", "A,RIGHT6,LEFT5", "A,RIGHT6,LEFT6", "A,RIGHT6,LEFT8", "B,RIGHT7,LEFT2", "B,RIGHT7,LEFT3", "B,RIGHT7,LEFT4", "B,RIGHT7,LEFT5", "B,RIGHT7,LEFT6", "B,RIGHT7,LEFT8", "null,RIGHT10,LEFT5", "null,RIGHT10,LEFT6", "null,RIGHT10,LEFT8"})), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testUnboundedAggAfterRowtimeInnerJoin() {
        String stringBuilder = new StringBuilder().append("SELECT key, COUNT(DISTINCT id1), COUNT(DISTINCT id2) FROM (").append(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.key as key, t2.id as id1, t1.id as id2\n        |FROM T1 as t1 join T2 as t2 ON\n        |  t1.key = t2.key AND\n        |  t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND\n        |    t2.rowtime + INTERVAL '6' SECOND\n        |")).stripMargin()).append(") GROUP BY key").toString();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple3("A", "LEFT0.999", BoxesRunTime.boxToLong(999L)));
        mutableList.$plus$eq(new Tuple3("A", "LEFT1", BoxesRunTime.boxToLong(1000L)));
        mutableList.$plus$eq(new Tuple3("A", "LEFT2", BoxesRunTime.boxToLong(2000L)));
        mutableList.$plus$eq(new Tuple3("A", "LEFT3", BoxesRunTime.boxToLong(3000L)));
        mutableList.$plus$eq(new Tuple3("B", "LEFT4", BoxesRunTime.boxToLong(4000L)));
        mutableList.$plus$eq(new Tuple3("A", "LEFT5", BoxesRunTime.boxToLong(5000L)));
        mutableList.$plus$eq(new Tuple3("A", "LEFT6", BoxesRunTime.boxToLong(6000L)));
        mutableList.$plus$eq(new Tuple3((Object) null, "LEFT8", BoxesRunTime.boxToLong(8000L)));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple3("A", "RIGHT6", BoxesRunTime.boxToLong(6000L)));
        mutableList2.$plus$eq(new Tuple3("B", "RIGHT7", BoxesRunTime.boxToLong(7000L)));
        mutableList2.$plus$eq(new Tuple3((Object) null, "RIGHT10", BoxesRunTime.boxToLong(10000L)));
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$47(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$48(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        tEnv().registerTable("T1", table);
        tEnv().registerTable("T2", table2);
        TestingRetractSink testingRetractSink = new TestingRetractSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stringBuilder)).toRetractStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingRetractSink);
        env().execute();
        Assert.assertEquals(MutableList$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"A,1,5", "B,1,1"})), testingRetractSink.getRetractResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testRowTimeInnerJoinWithEquiTimeAttrs() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.key, t2.id, t1.id\n        |FROM T1 AS t1 JOIN T2 AS t2 ON\n        |t1.key = t2.key AND\n        |t2.rowtime = t1.rowtime\n      ")).stripMargin();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(4), BoxesRunTime.boxToLong(4000L), "A", BoxesRunTime.boxToLong(4000L)));
        mutableList.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(5), BoxesRunTime.boxToLong(5000L), "A", BoxesRunTime.boxToLong(5000L)));
        mutableList.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(6), BoxesRunTime.boxToLong(6000L), "A", BoxesRunTime.boxToLong(6000L)));
        mutableList.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(6), BoxesRunTime.boxToLong(6000L), "B", BoxesRunTime.boxToLong(6000L)));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple3("A", "R-5", BoxesRunTime.boxToLong(5000L)));
        mutableList2.$plus$eq(new Tuple3("B", "R-6", BoxesRunTime.boxToLong(6000L)));
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$49(this)).assignTimestampsAndWatermarks(new Row4WatermarkExtractor())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$8), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$7)}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$50(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$7)}));
        tEnv().registerTable("T1", table);
        tEnv().registerTable("T2", table2);
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(MutableList$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"A,R-5,5", "B,R-6,6"})).toList().sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testRowTimeInnerJoinWithOtherConditions() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.a, t1.c, t2.c\n        |FROM T1 as t1 JOIN T2 as t2 ON\n        |  t1.a = t2.a AND\n        |  t1.rowtime > t2.rowtime - INTERVAL '5' SECOND AND\n        |    t1.rowtime < t2.rowtime - INTERVAL '1' SECOND AND\n        |  t1.b < t2.b AND\n        |  t1.b > 2\n        |")).stripMargin();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(4L), "LEFT1", BoxesRunTime.boxToLong(1000L)));
        mutableList.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(8L), "LEFT1.1", BoxesRunTime.boxToLong(1001L)));
        mutableList.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(2L), "LEFT2", BoxesRunTime.boxToLong(2000L)));
        mutableList.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(7L), "LEFT3", BoxesRunTime.boxToLong(3000L)));
        mutableList.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(2), BoxesRunTime.boxToLong(5L), "LEFT4", BoxesRunTime.boxToLong(4000L)));
        mutableList.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(4L), "LEFT4.9", BoxesRunTime.boxToLong(4999L)));
        mutableList.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(4L), "LEFT5", BoxesRunTime.boxToLong(5000L)));
        mutableList.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(10L), "LEFT6", BoxesRunTime.boxToLong(6000L)));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(1L), "RIGHT1", BoxesRunTime.boxToLong(1000L)));
        mutableList2.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(9L), "RIGHT6", BoxesRunTime.boxToLong(6000L)));
        mutableList2.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(2), BoxesRunTime.boxToLong(14L), "RIGHT7", BoxesRunTime.boxToLong(7000L)));
        mutableList2.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(4L), "RIGHT8", BoxesRunTime.boxToLong(8000L)));
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$51(this)).assignTimestampsAndWatermarks(new Row4WatermarkExtractor())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$52(this)).assignTimestampsAndWatermarks(new Row4WatermarkExtractor())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        tEnv().registerTable("T1", table);
        tEnv().registerTable("T2", table2);
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(MutableList$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"1,LEFT3,RIGHT6", "1,LEFT1.1,RIGHT6", "2,LEFT4,RIGHT7", "1,LEFT4.9,RIGHT6"})).toList().sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testRowTimeInnerJoinWithOtherTimeCondition() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.a, t1.c, t2.c\n        |FROM T1 as t1 JOIN T2 as t2 ON\n        |  t1.a = t2.a AND\n        |  t1.rowtime > t2.rowtime - INTERVAL '4' SECOND AND\n        |    t1.rowtime < t2.rowtime AND\n        |  QUARTER(t1.rowtime) = t2.a\n        |")).stripMargin();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(4L), "LEFT1", BoxesRunTime.boxToLong(1000L)));
        mutableList.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(2L), "LEFT2", BoxesRunTime.boxToLong(2000L)));
        mutableList.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(7L), "LEFT3", BoxesRunTime.boxToLong(3000L)));
        mutableList.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(2), BoxesRunTime.boxToLong(5L), "LEFT4", BoxesRunTime.boxToLong(4000L)));
        mutableList.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(4L), "LEFT5", BoxesRunTime.boxToLong(5000L)));
        mutableList.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(10L), "LEFT6", BoxesRunTime.boxToLong(6000L)));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(1L), "RIGHT1", BoxesRunTime.boxToLong(1000L)));
        mutableList2.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(9L), "RIGHT6", BoxesRunTime.boxToLong(6000L)));
        mutableList2.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(2), BoxesRunTime.boxToLong(8L), "RIGHT7", BoxesRunTime.boxToLong(7000L)));
        mutableList2.$plus$eq(new Tuple4(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(4L), "RIGHT8", BoxesRunTime.boxToLong(8000L)));
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$53(this)).assignTimestampsAndWatermarks(new Row4WatermarkExtractor())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$54(this)).assignTimestampsAndWatermarks(new Row4WatermarkExtractor())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        tEnv().registerTable("T1", table);
        tEnv().registerTable("T2", table2);
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(MutableList$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"1,LEFT3,RIGHT6", "1,LEFT5,RIGHT6", "1,LEFT5,RIGHT8", "1,LEFT6,RIGHT8"})).toList().sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testRowTimeInnerJoinWithWindowAggregateOnFirstTime() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.key, TUMBLE_END(t1.rowtime, INTERVAL '4' SECOND), COUNT(t2.key)\n        |FROM T1 AS t1 join T2 AS t2 ON\n        |  t1.key = t2.key AND\n        |  t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND\n        |    t2.rowtime + INTERVAL '5' SECOND\n        |GROUP BY TUMBLE(t1.rowtime, INTERVAL '4' SECOND), t1.key\n        |")).stripMargin();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple3("A", "L-1", BoxesRunTime.boxToLong(1000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-2", BoxesRunTime.boxToLong(2000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-3", BoxesRunTime.boxToLong(3000L)));
        mutableList.$plus$eq(new Tuple3("B", "L-4", BoxesRunTime.boxToLong(4000L)));
        mutableList.$plus$eq(new Tuple3("C", "L-5", BoxesRunTime.boxToLong(4000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-6", BoxesRunTime.boxToLong(10000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-7", BoxesRunTime.boxToLong(13000L)));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple3("A", "R-1", BoxesRunTime.boxToLong(7000L)));
        mutableList2.$plus$eq(new Tuple3("B", "R-4", BoxesRunTime.boxToLong(7000L)));
        mutableList2.$plus$eq(new Tuple3("A", "R-3", BoxesRunTime.boxToLong(8000L)));
        mutableList2.$plus$eq(new Tuple3("D", "R-2", BoxesRunTime.boxToLong(8000L)));
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$55(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$56(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        tEnv().registerTable("T1", table);
        tEnv().registerTable("T2", table2);
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(MutableList$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"A,1970-01-01T00:00:04,3", "A,1970-01-01T00:00:12,2", "A,1970-01-01T00:00:16,1", "B,1970-01-01T00:00:08,1"})).toList().sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testRowTimeInnerJoinWithWindowAggregateOnSecondTime() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.key, TUMBLE_END(t2.rowtime, INTERVAL '4' SECOND), COUNT(t1.key)\n        |FROM T1 AS t1 join T2 AS t2 ON\n        | t1.key = t2.key AND\n        | t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND\n        | t2.rowtime + INTERVAL '5' SECOND\n        | GROUP BY TUMBLE(t2.rowtime, INTERVAL '4' SECOND), t2.key\n      ")).stripMargin();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple3("A", "L-1", BoxesRunTime.boxToLong(1000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-2", BoxesRunTime.boxToLong(2000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-3", BoxesRunTime.boxToLong(3000L)));
        mutableList.$plus$eq(new Tuple3("B", "L-4", BoxesRunTime.boxToLong(4000L)));
        mutableList.$plus$eq(new Tuple3("C", "L-5", BoxesRunTime.boxToLong(4000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-6", BoxesRunTime.boxToLong(10000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-7", BoxesRunTime.boxToLong(13000L)));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple3("A", "R-1", BoxesRunTime.boxToLong(7000L)));
        mutableList2.$plus$eq(new Tuple3("B", "R-4", BoxesRunTime.boxToLong(7000L)));
        mutableList2.$plus$eq(new Tuple3("A", "R-3", BoxesRunTime.boxToLong(8000L)));
        mutableList2.$plus$eq(new Tuple3("D", "R-2", BoxesRunTime.boxToLong(8000L)));
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$57(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$58(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        tEnv().registerTable("T1", table);
        tEnv().registerTable("T2", table2);
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(MutableList$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"A,1970-01-01T00:00:08,3", "A,1970-01-01T00:00:12,3", "B,1970-01-01T00:00:08,1"})).toList().sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testProcTimeLeftOuterJoin() {
        env().setParallelism(1);
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.a, t2.c, t1.c\n        |FROM T1 AS t1 LEFT OUTER JOIN T2 AS t2 ON\n        | t1.a = t2.a AND\n        | t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND\n        | t2.proctime + INTERVAL '3' SECOND\n      ")).stripMargin();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(1L), "Hi1"));
        mutableList.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(2L), "Hi2"));
        mutableList.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(5L), "Hi3"));
        mutableList.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(2), BoxesRunTime.boxToLong(7L), "Hi5"));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(1L), "HiHi"));
        mutableList2.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(2), BoxesRunTime.boxToLong(2L), "HeHe"));
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$59(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).proctime()}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$60(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).proctime()}));
        tEnv().registerTable("T1", table);
        tEnv().registerTable("T2", table2);
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(new TestingAppendSink());
        env().execute();
    }

    @Test
    public void testRowTimeLeftOuterJoin() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.key, t2.id, t1.id\n        |FROM T1 AS t1 LEFT OUTER JOIN  T2 AS t2 ON\n        | t1.key = t2.key AND\n        | t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND\n        | t2.rowtime + INTERVAL '6' SECOND AND\n        | t1.id <> 'L-5'\n      ")).stripMargin();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple3("A", "L-1", BoxesRunTime.boxToLong(1000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-2", BoxesRunTime.boxToLong(2000L)));
        mutableList.$plus$eq(new Tuple3("B", "L-4", BoxesRunTime.boxToLong(4000L)));
        mutableList.$plus$eq(new Tuple3("B", "L-5", BoxesRunTime.boxToLong(5000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-6", BoxesRunTime.boxToLong(6000L)));
        mutableList.$plus$eq(new Tuple3("C", "L-7", BoxesRunTime.boxToLong(7000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-10", BoxesRunTime.boxToLong(10000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-12", BoxesRunTime.boxToLong(12000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-20", BoxesRunTime.boxToLong(20000L)));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple3("A", "R-6", BoxesRunTime.boxToLong(6000L)));
        mutableList2.$plus$eq(new Tuple3("B", "R-7", BoxesRunTime.boxToLong(7000L)));
        mutableList2.$plus$eq(new Tuple3("D", "R-8", BoxesRunTime.boxToLong(8000L)));
        mutableList2.$plus$eq(new Tuple3("A", "R-11", BoxesRunTime.boxToLong(11000L)));
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$61(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$62(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        tEnv().registerTable("T1", table);
        tEnv().registerTable("T2", table2);
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(MutableList$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"A,R-6,L-1", "A,R-6,L-2", "A,R-6,L-6", "A,R-6,L-10", "A,R-6,L-12", "B,R-7,L-4", "A,R-11,L-6", "A,R-11,L-10", "A,R-11,L-12", "B,null,L-5", "C,null,L-7", "A,null,L-20"})).toList().sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testRowTimeLeftOuterJoinNegativeIntervalSize() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.key, t2.id, t1.id\n        |FROM T1 AS t1 LEFT OUTER JOIN T2 AS t2 ON\n        | t1.key = t2.key AND\n        |  t1.rowtime BETWEEN t2.rowtime + INTERVAL '3' SECOND AND\n        |  t2.rowtime + INTERVAL '1' SECOND\n      ")).stripMargin();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple3("A", "L-1", BoxesRunTime.boxToLong(1000L)));
        mutableList.$plus$eq(new Tuple3("B", "L-4", BoxesRunTime.boxToLong(4000L)));
        mutableList.$plus$eq(new Tuple3("C", "L-7", BoxesRunTime.boxToLong(7000L)));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple3("A", "R-6", BoxesRunTime.boxToLong(6000L)));
        mutableList2.$plus$eq(new Tuple3("B", "R-7", BoxesRunTime.boxToLong(7000L)));
        mutableList2.$plus$eq(new Tuple3("D", "R-8", BoxesRunTime.boxToLong(8000L)));
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$63(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$64(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        tEnv().registerTable("T1", table);
        tEnv().registerTable("T2", table2);
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(MutableList$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"null,null,L-1", "null,null,L-4", "null,null,L-7"})).toList().sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testProcTimeRightOuterJoin() {
        env().setParallelism(1);
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.a, t2.c, t1.c\n        |FROM T1 as t1 RIGHT  OUTER JOIN T2 as t2 ON\n        | t1.a = t2.a AND\n        | t1.proctime BETWEEN t2.proctime -  INTERVAL '5' SECOND AND\n        | t2.proctime + INTERVAL '3' SECOND\n      ")).stripMargin();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(1L), "Hi1"));
        mutableList.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(2L), "Hi2"));
        mutableList.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(5L), "Hi3"));
        mutableList.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(2), BoxesRunTime.boxToLong(7L), "Hi5"));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(1L), "HiHi"));
        mutableList2.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(2), BoxesRunTime.boxToLong(2L), "HeHe"));
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$65(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).proctime()}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$66(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).proctime()}));
        tEnv().registerTable("T1", table);
        tEnv().registerTable("T2", table2);
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(new TestingAppendSink());
        env().execute();
    }

    @Test
    public void testRowTimeRightOuterJoin() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.key, t2.id, t1.id\n        |FROM T1 AS t1 RIGHT OUTER JOIN T2 AS t2 ON\n        | t1.key = t2.key AND\n        | t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND\n        | t2.rowtime + INTERVAL '6' SECOND AND\n        | t2.id <> 'R-5'\n      ")).stripMargin();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple3("A", "L-1", BoxesRunTime.boxToLong(1000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-2", BoxesRunTime.boxToLong(2000L)));
        mutableList.$plus$eq(new Tuple3("B", "L-4", BoxesRunTime.boxToLong(4000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-6", BoxesRunTime.boxToLong(6000L)));
        mutableList.$plus$eq(new Tuple3("C", "L-7", BoxesRunTime.boxToLong(7000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-10", BoxesRunTime.boxToLong(10000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-12", BoxesRunTime.boxToLong(12000L)));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple3("A", "R-5", BoxesRunTime.boxToLong(5000L)));
        mutableList2.$plus$eq(new Tuple3("A", "R-6", BoxesRunTime.boxToLong(6000L)));
        mutableList2.$plus$eq(new Tuple3("B", "R-7", BoxesRunTime.boxToLong(7000L)));
        mutableList2.$plus$eq(new Tuple3("D", "R-8", BoxesRunTime.boxToLong(8000L)));
        mutableList2.$plus$eq(new Tuple3("A", "R-20", BoxesRunTime.boxToLong(20000L)));
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$67(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$68(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        tEnv().registerTable("T1", table);
        tEnv().registerTable("T2", table2);
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(MutableList$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"A,R-5,null", "A,R-6,L-1", "A,R-6,L-2", "A,R-6,L-6", "A,R-6,L-10", "A,R-6,L-12", "A,R-20,null", "B,R-7,L-4", "D,R-8,null"})).toList().sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testRowTimeRightOuterJoinNegativeIntervalSize() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.key, t2.id, t1.id\n        |FROM T1 AS t1 RIGHT OUTER JOIN T2 AS t2 ON\n        |t1.key = t2.key AND\n        |t1.rowtime BETWEEN t2.rowtime + INTERVAL '5' SECOND AND\n        |t2.rowtime + INTERVAL '1' SECOND\n      ")).stripMargin();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple3("A", "L-1", BoxesRunTime.boxToLong(1000L)));
        mutableList.$plus$eq(new Tuple3("B", "L-4", BoxesRunTime.boxToLong(4000L)));
        mutableList.$plus$eq(new Tuple3("C", "L-7", BoxesRunTime.boxToLong(7000L)));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple3("A", "R-6", BoxesRunTime.boxToLong(6000L)));
        mutableList2.$plus$eq(new Tuple3("B", "R-7", BoxesRunTime.boxToLong(7000L)));
        mutableList2.$plus$eq(new Tuple3("D", "R-8", BoxesRunTime.boxToLong(8000L)));
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$69(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$70(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        tEnv().registerTable("T1", table);
        tEnv().registerTable("T2", table2);
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(MutableList$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"A,R-6,null", "B,R-7,null", "D,R-8,null"})).toList().sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testProcTimeFullOuterJoin() {
        env().setParallelism(1);
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.a, t2.c, t1.c\n        |FROM T1 as t1 FULL OUTER JOIN T2 as t2 ON\n        |t1.a = t2.a AND\n        |t1.proctime BETWEEN t2.proctime -  INTERVAL '5' SECOND AND\n        |t2.proctime\n      ")).stripMargin();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(1L), "Hi1"));
        mutableList.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(2L), "Hi2"));
        mutableList.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(5L), "Hi3"));
        mutableList.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(2), BoxesRunTime.boxToLong(7L), "Hi5"));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(1), BoxesRunTime.boxToLong(1L), "HiHi"));
        mutableList2.$plus$eq(new Tuple3(BoxesRunTime.boxToInteger(2), BoxesRunTime.boxToLong(2L), "HeHe"));
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$71(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).proctime()}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$72(this))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$1), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$2), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$4).proctime()}));
        tEnv().registerTable("T1", table);
        tEnv().registerTable("T2", table2);
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(new TestingAppendSink());
        env().execute();
    }

    @Test
    public void testRowTimeFullOuterJoin() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.key, t2.id, t1.id\n        |FROM T1 AS t1 FULL OUTER JOIN T2 AS t2 ON\n        |t1.key = t2.key AND\n        |t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND\n        |t2.rowtime + INTERVAL '6' SECOND AND\n        |NOT (t1.id = 'L-5' OR t2.id = 'R-5')\n      ")).stripMargin();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple3("A", "L-1", BoxesRunTime.boxToLong(1000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-2", BoxesRunTime.boxToLong(2000L)));
        mutableList.$plus$eq(new Tuple3("B", "L-4", BoxesRunTime.boxToLong(4000L)));
        mutableList.$plus$eq(new Tuple3("B", "L-5", BoxesRunTime.boxToLong(5000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-6", BoxesRunTime.boxToLong(6000L)));
        mutableList.$plus$eq(new Tuple3("C", "L-7", BoxesRunTime.boxToLong(7000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-10", BoxesRunTime.boxToLong(10000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-12", BoxesRunTime.boxToLong(12000L)));
        mutableList.$plus$eq(new Tuple3("A", "L-20", BoxesRunTime.boxToLong(20000L)));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple3("A", "R-5", BoxesRunTime.boxToLong(5000L)));
        mutableList2.$plus$eq(new Tuple3("A", "R-6", BoxesRunTime.boxToLong(6000L)));
        mutableList2.$plus$eq(new Tuple3("B", "R-7", BoxesRunTime.boxToLong(7000L)));
        mutableList2.$plus$eq(new Tuple3("D", "R-8", BoxesRunTime.boxToLong(8000L)));
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$73(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$74(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        tEnv().registerTable("T1", table);
        tEnv().registerTable("T2", table2);
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(MutableList$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"A,R-6,L-1", "A,R-6,L-2", "A,R-6,L-6", "A,R-6,L-10", "A,R-6,L-12", "B,R-7,L-4", "A,R-5,null", "D,R-8,null", "null,null,L-5", "null,null,L-7", "null,null,L-20"})).toList().sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testRowTimeFullOuterJoinNegativeIntervalSize() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.key, t2.id, t1.id\n        |FROM T1 AS t1 FULL OUTER JOIN T2 AS t2 ON\n        |t1.key = t2.key AND\n        |t1.rowtime BETWEEN t2.rowtime + INTERVAL '5' SECOND AND\n        |t2.rowtime + INTERVAL '4' SECOND\n      ")).stripMargin();
        MutableList mutableList = new MutableList();
        mutableList.$plus$eq(new Tuple3("A", "L-1", BoxesRunTime.boxToLong(1000L)));
        mutableList.$plus$eq(new Tuple3("B", "L-4", BoxesRunTime.boxToLong(4000L)));
        mutableList.$plus$eq(new Tuple3("C", "L-7", BoxesRunTime.boxToLong(7000L)));
        MutableList mutableList2 = new MutableList();
        mutableList2.$plus$eq(new Tuple3("A", "R-6", BoxesRunTime.boxToLong(6000L)));
        mutableList2.$plus$eq(new Tuple3("B", "R-7", BoxesRunTime.boxToLong(7000L)));
        mutableList2.$plus$eq(new Tuple3("D", "R-8", BoxesRunTime.boxToLong(8000L)));
        Table table = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList, new IntervalJoinITCase$$anon$75(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        Table table2 = package$.MODULE$.dataStreamConversions(env().fromCollection(mutableList2, new IntervalJoinITCase$$anon$76(this)).assignTimestampsAndWatermarks(new Row3WatermarkExtractor2())).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$5), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression(symbol$6), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression(symbol$7).rowtime()}));
        tEnv().registerTable("T1", table);
        tEnv().registerTable("T2", table2);
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(MutableList$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"null,null,L-1", "null,null,L-4", "null,null,L-7", "A,R-6,null", "B,R-7,null", "D,R-8,null"})).toList().sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    public IntervalJoinITCase(StreamingWithStateTestBase.StateBackendMode stateBackendMode) {
        super(stateBackendMode);
    }
}
