/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.harness;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.join.JoinType;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.table.api.StreamQueryConfig;
import org.apache.flink.table.api.Types$;
import org.apache.flink.table.runtime.harness.HarnessTestBase;
import org.apache.flink.table.runtime.join.NonWindowFullJoin;
import org.apache.flink.table.runtime.join.NonWindowFullJoinWithNonEquiPredicates;
import org.apache.flink.table.runtime.join.NonWindowInnerJoin;
import org.apache.flink.table.runtime.join.NonWindowLeftRightJoin;
import org.apache.flink.table.runtime.join.NonWindowLeftRightJoinWithNonEquiPredicates;
import org.apache.flink.table.runtime.join.ProcTimeBoundedStreamJoin;
import org.apache.flink.table.runtime.join.RowTimeBoundedStreamJoin;
import org.apache.flink.table.runtime.operators.KeyedCoProcessOperatorWithWatermarkDelay;
import org.apache.flink.table.runtime.types.CRow$;
import org.apache.flink.table.runtime.types.CRowTypeInfo;
import org.apache.flink.table.runtime.types.CRowTypeInfo$;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Test;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u0005=b\u0001B\u0001\u0003\u0001=\u0011qBS8j]\"\u000b'O\\3tgR+7\u000f\u001e\u0006\u0003\u0007\u0011\tq\u0001[1s]\u0016\u001c8O\u0003\u0002\u0006\r\u00059!/\u001e8uS6,'BA\u0004\t\u0003\u0015!\u0018M\u00197f\u0015\tI!\"A\u0003gY&t7N\u0003\u0002\f\u0019\u00051\u0011\r]1dQ\u0016T\u0011!D\u0001\u0004_J<7\u0001A\n\u0003\u0001A\u0001\"!\u0005\n\u000e\u0003\tI!a\u0005\u0002\u0003\u001f!\u000b'O\\3tgR+7\u000f\u001e\"bg\u0016DQ!\u0006\u0001\u0005\u0002Y\ta\u0001P5oSRtD#A\f\u0011\u0005E\u0001\u0001bB\r\u0001\u0005\u0004%IAG\u0001\fcV,'/_\"p]\u001aLw-F\u0001\u001c!\tabF\u0004\u0002\u001eY9\u0011ad\u000b\b\u0003?)r!\u0001I\u0015\u000f\u0005\u0005BcB\u0001\u0012(\u001d\t\u0019c%D\u0001%\u0015\t)c\"\u0001\u0004=e>|GOP\u0005\u0002\u001b%\u00111\u0002D\u0005\u0003\u0013)I!a\u0002\u0005\n\u0005\u00151\u0011BA\u0002\u0005\u0013\ti#!A\bICJtWm]:UKN$()Y:f\u0013\ty\u0003GA\u000bUKN$8\u000b\u001e:fC6\fV/\u001a:z\u0007>tg-[4\u000b\u00055\u0012\u0001B\u0002\u001a\u0001A\u0003%1$\u0001\u0007rk\u0016\u0014\u0018pQ8oM&<\u0007\u0005C\u00045\u0001\t\u0007I\u0011B\u001b\u0002\u000fI|w\u000fV=qKV\ta\u0007E\u00028}\u0001k\u0011\u0001\u000f\u0006\u0003si\n\u0001\u0002^=qK&tgm\u001c\u0006\u0003wq\naaY8n[>t'BA\u001f\t\u0003\r\t\u0007/[\u0005\u0003\u007fa\u0012q\u0002V=qK&sgm\u001c:nCRLwN\u001c\t\u0003\u0003\u0012k\u0011A\u0011\u0006\u0003\u0007\"\tQ\u0001^=qKNL!!\u0012\"\u0003\u0007I{w\u000f\u0003\u0004H\u0001\u0001\u0006IAN\u0001\te><H+\u001f9fA!9\u0011\n\u0001b\u0001\n\u0003Q\u0015\u0001\u00034v]\u000e\u001cu\u000eZ3\u0016\u0003-\u0003\"\u0001\u0014*\u000f\u00055\u0003\u0006CA\u0012O\u0015\u0005y\u0015!B:dC2\f\u0017BA)O\u0003\u0019\u0001&/\u001a3fM&\u00111\u000b\u0016\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005Es\u0005B\u0002,\u0001A\u0003%1*A\u0005gk:\u001c7i\u001c3fA!9\u0001\f\u0001b\u0001\n\u0003Q\u0015\u0001\u00074v]\u000e\u001cu\u000eZ3XSRDgj\u001c8FcV\fG\u000e\u0015:fI\"1!\f\u0001Q\u0001\n-\u000b\u0011DZ;oG\u000e{G-Z,ji\"tuN\\#rk\u0006d\u0007K]3eA!9A\f\u0001b\u0001\n\u0003Q\u0015!\u00074v]\u000e\u001cu\u000eZ3XSRDgj\u001c8FcV\fG\u000e\u0015:fIJBaA\u0018\u0001!\u0002\u0013Y\u0015A\u00074v]\u000e\u001cu\u000eZ3XSRDgj\u001c8FcV\fG\u000e\u0015:fIJ\u0002\u0003\"\u00021\u0001\t\u0003\t\u0017!\n;fgR\u0004&o\\2US6,\u0017J\u001c8fe*{\u0017N\\,ji\"\u001cu.\\7p]\n{WO\u001c3t)\u0005\u0011\u0007CA2e\u001b\u0005q\u0015BA3O\u0005\u0011)f.\u001b;)\u0005};\u0007C\u00015l\u001b\u0005I'B\u00016\r\u0003\u0015QWO\\5u\u0013\ta\u0017N\u0001\u0003UKN$\b\"\u00028\u0001\t\u0003\t\u0017a\n;fgR\u0004&o\\2US6,\u0017J\u001c8fe*{\u0017N\\,ji\"tUmZ1uSZ,'i\\;oIND#!\\4\t\u000bE\u0004A\u0011A1\u0002IQ,7\u000f\u001e*poRKW.Z%o]\u0016\u0014(j\\5o/&$\bnQ8n[>t'i\\;oIND#\u0001]4\t\u000bQ\u0004A\u0011A1\u0002MQ,7\u000f\u001e*poRKW.Z%o]\u0016\u0014(j\\5o/&$\bNT3hCRLg/\u001a\"pk:$7\u000f\u000b\u0002tO\")q\u000f\u0001C\u0001C\u0006AB/Z:u%><H+[7f\u0019\u00164GoT;uKJTu.\u001b8)\u0005Y<\u0007\"\u0002>\u0001\t\u0003\t\u0017!\u0007;fgR\u0014vn\u001e+j[\u0016\u0014\u0016n\u001a5u\u001fV$XM\u001d&pS:D#!_4\t\u000bu\u0004A\u0011A1\u00021Q,7\u000f\u001e*poRKW.\u001a$vY2|U\u000f^3s\u0015>Lg\u000e\u000b\u0002}O\"1\u0011\u0011\u0001\u0001\u0005\u0002\u0005\fa\u0003^3ti:{gnV5oI><\u0018J\u001c8fe*{\u0017N\u001c\u0015\u0003\u007f\u001eDa!a\u0002\u0001\t\u0003\t\u0017!\t;fgRtuN\\,j]\u0012|w/\u00138oKJTu.\u001b8XSRD'+\u001a;sC\u000e$\bfAA\u0003O\"1\u0011Q\u0002\u0001\u0005\u0002\u0005\f\u0001\u0006^3ti:{gnV5oI><H*\u001a4u\u0015>LgnV5uQ>,HOT8o\u000bF,\u0018\r\u001c)sK\u0012D3!a\u0003h\u0011\u0019\t\u0019\u0002\u0001C\u0001C\u0006)C/Z:u\u001d>tw+\u001b8e_^dUM\u001a;K_&tw+\u001b;i\u001d>tW)];bYB\u0013X\r\u001a\u0015\u0004\u0003#9\u0007BBA\r\u0001\u0011\u0005\u0011-A\u0015uKN$hj\u001c8XS:$wn\u001e*jO\"$(j\\5o/&$\bn\\;u\u001d>tW)];bYB\u0013X\r\u001a\u0015\u0004\u0003/9\u0007BBA\u0010\u0001\u0011\u0005\u0011-\u0001\u0014uKN$hj\u001c8XS:$wn\u001e*jO\"$(j\\5o/&$\bNT8o\u000bF,\u0018\r\u001c)sK\u0012D3!!\bh\u0011\u0019\t)\u0003\u0001C\u0001C\u0006AC/Z:u\u001d>tw+\u001b8e_^4U\u000f\u001c7K_&tw+\u001b;i_V$hj\u001c8FcV\fG\u000e\u0015:fI\"\u001a\u00111E4\t\r\u0005-\u0002\u0001\"\u0001b\u0003\u0015\"Xm\u001d;O_:<\u0016N\u001c3po\u001a+H\u000e\u001c&pS:<\u0016\u000e\u001e5O_:,\u0015/^1m!J,G\rK\u0002\u0002*\u001d\u0004")
public class JoinHarnessTest
extends HarnessTestBase {
    private final HarnessTestBase.TestStreamQueryConfig queryConfig = new HarnessTestBase.TestStreamQueryConfig(Time.milliseconds((long)2L), Time.milliseconds((long)4L));
    private final TypeInformation<Row> rowType = Types$.MODULE$.ROW((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TypeInformation[]{Types$.MODULE$.LONG(), Types$.MODULE$.STRING()}));
    private final String funcCode = new StringOps(Predef$.MODULE$.augmentString("\n      |public class TestJoinFunction\n      |          extends org.apache.flink.api.common.functions.RichFlatJoinFunction {\n      |  transient org.apache.flink.types.Row out =\n      |            new org.apache.flink.types.Row(4);\n      |  public TestJoinFunction() throws Exception {}\n      |\n      |  @Override\n      |  public void open(org.apache.flink.configuration.Configuration parameters)\n      |  throws Exception {}\n      |\n      |  @Override\n      |  public void join(Object _in1, Object _in2, org.apache.flink.util.Collector c)\n      |   throws Exception {\n      |   org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) _in1;\n      |   org.apache.flink.types.Row in2 = (org.apache.flink.types.Row) _in2;\n      |\n      |   out.setField(0, in1.getField(0));\n      |   out.setField(1, in1.getField(1));\n      |   out.setField(2, in2.getField(0));\n      |   out.setField(3, in2.getField(1));\n      |\n      |   c.collect(out);\n      |\n      |  }\n      |\n      |  @Override\n      |  public void close() throws Exception {}\n      |}\n    ")).stripMargin();
    private final String funcCodeWithNonEqualPred = new StringOps(Predef$.MODULE$.augmentString("\n      |public class TestJoinFunction\n      |          extends org.apache.flink.api.common.functions.RichFlatJoinFunction {\n      |  transient org.apache.flink.types.Row out =\n      |            new org.apache.flink.types.Row(4);\n      |  public TestJoinFunction() throws Exception {}\n      |\n      |  @Override\n      |  public void open(org.apache.flink.configuration.Configuration parameters)\n      |  throws Exception {}\n      |\n      |  @Override\n      |  public void join(Object _in1, Object _in2, org.apache.flink.util.Collector c)\n      |   throws Exception {\n      |   org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) _in1;\n      |   org.apache.flink.types.Row in2 = (org.apache.flink.types.Row) _in2;\n      |\n      |   out.setField(0, in1.getField(0));\n      |   out.setField(1, in1.getField(1));\n      |   out.setField(2, in2.getField(0));\n      |   out.setField(3, in2.getField(1));\n      |   if(((java.lang.String)in1.getField(1)).compareTo((java.lang.String)in2.getField(1))>0) {\n      |      c.collect(out);\n      |   }\n      |  }\n      |\n      |  @Override\n      |  public void close() throws Exception {}\n      |}\n    ")).stripMargin();
    private final String funcCodeWithNonEqualPred2 = new StringOps(Predef$.MODULE$.augmentString("\n      |public class TestJoinFunction\n      |          extends org.apache.flink.api.common.functions.RichFlatJoinFunction {\n      |  transient org.apache.flink.types.Row out =\n      |            new org.apache.flink.types.Row(4);\n      |  public TestJoinFunction() throws Exception {}\n      |\n      |  @Override\n      |  public void open(org.apache.flink.configuration.Configuration parameters)\n      |  throws Exception {}\n      |\n      |  @Override\n      |  public void join(Object _in1, Object _in2, org.apache.flink.util.Collector c)\n      |   throws Exception {\n      |   org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) _in1;\n      |   org.apache.flink.types.Row in2 = (org.apache.flink.types.Row) _in2;\n      |\n      |   out.setField(0, in1.getField(0));\n      |   out.setField(1, in1.getField(1));\n      |   out.setField(2, in2.getField(0));\n      |   out.setField(3, in2.getField(1));\n      |   if(((java.lang.String)in1.getField(1)).compareTo((java.lang.String)in2.getField(1))<0) {\n      |      c.collect(out);\n      |   }\n      |  }\n      |\n      |  @Override\n      |  public void close() throws Exception {}\n      |}\n    ")).stripMargin();

    private HarnessTestBase.TestStreamQueryConfig queryConfig() {
        return this.queryConfig;
    }

    private TypeInformation<Row> rowType() {
        return this.rowType;
    }

    public String funcCode() {
        return this.funcCode;
    }

    public String funcCodeWithNonEqualPred() {
        return this.funcCodeWithNonEqualPred;
    }

    public String funcCodeWithNonEqualPred2() {
        return this.funcCodeWithNonEqualPred2;
    }

    @Test
    public void testProcTimeInnerJoinWithCommonBounds() {
        ProcTimeBoundedStreamJoin joinProcessFunc = new ProcTimeBoundedStreamJoin(JoinType.INNER, -10L, 20L, this.rowType(), this.rowType(), "TestJoinFunction", this.funcCode());
        KeyedCoProcessOperator operator = new KeyedCoProcessOperator((CoProcessFunction)joinProcessFunc);
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), Types$.MODULE$.INT(), 1, 1, 0);
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "1a1"})), 1L));
        Assert.assertEquals((long)1L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(2L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "2a2"})), 2L));
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(3L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "1a3"})), 3L));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "1b3"})), 3L));
        testHarness.setProcessingTime(4L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "2b4"})), 4L));
        Assert.assertEquals((long)8L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(13L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "1b13"})), 13L));
        testHarness.setProcessingTime(33L);
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "1a33"})), 33L));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "2a33"})), 33L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "2b33"})), 33L));
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "1a1", Predef$.MODULE$.long2Long(1L), "1b3"})), 3L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "1a3", Predef$.MODULE$.long2Long(1L), "1b3"})), 3L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "2a2", Predef$.MODULE$.long2Long(2L), "2b4"})), 4L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "1a3", Predef$.MODULE$.long2Long(1L), "1b13"})), 13L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "1a33", Predef$.MODULE$.long2Long(1L), "1b13"})), 33L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "2a33", Predef$.MODULE$.long2Long(2L), "2b33"})), 33L));
        this.verify(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testProcTimeInnerJoinWithNegativeBounds() {
        ProcTimeBoundedStreamJoin joinProcessFunc = new ProcTimeBoundedStreamJoin(JoinType.INNER, -10L, -5L, this.rowType(), this.rowType(), "TestJoinFunction", this.funcCode());
        KeyedCoProcessOperator operator = new KeyedCoProcessOperator((CoProcessFunction)joinProcessFunc);
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), Types$.MODULE$.INT(), 1, 1, 0);
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "1a1"})), 1L));
        testHarness.setProcessingTime(2L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "2a2"})), 2L));
        testHarness.setProcessingTime(3L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "1a3"})), 3L));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "1b3"})), 3L));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(7L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "2b7"})), 7L));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(12L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "1b12"})), 12L));
        testHarness.setProcessingTime(13L);
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(14L);
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)1L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(16L);
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numProcessingTimeTimers());
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "2a2", Predef$.MODULE$.long2Long(2L), "2b7"})), 7L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "1a3", Predef$.MODULE$.long2Long(1L), "1b12"})), 12L));
        this.verify(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testRowTimeInnerJoinWithCommonBounds() {
        RowTimeBoundedStreamJoin joinProcessFunc = new RowTimeBoundedStreamJoin(JoinType.INNER, -10L, 20L, 0L, this.rowType(), this.rowType(), "TestJoinFunction", this.funcCode(), 0, 0);
        KeyedCoProcessOperatorWithWatermarkDelay operator = new KeyedCoProcessOperatorWithWatermarkDelay((CoProcessFunction)joinProcessFunc, joinProcessFunc.getMaxOutputDelay());
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, new HarnessTestBase.TupleRowKeySelector(1), new HarnessTestBase.TupleRowKeySelector(1), Types$.MODULE$.STRING(), 1, 1, 0);
        testHarness.open();
        testHarness.processWatermark1(new Watermark(1L));
        testHarness.processWatermark2(new Watermark(1L));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "k1"})), 0L));
        Assert.assertEquals((long)1L, (long)testHarness.numEventTimeTimers());
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "k1"})), 0L));
        Assert.assertEquals((long)2L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(5L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(15L), "k1"})), 0L));
        testHarness.processWatermark1(new Watermark(20L));
        testHarness.processWatermark2(new Watermark(20L));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(35L), "k1"})), 0L));
        testHarness.processWatermark1(new Watermark(38L));
        testHarness.processWatermark2(new Watermark(38L));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(40L), "k2"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(39L), "k2"})), 0L));
        Assert.assertEquals((long)6L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(61L));
        testHarness.processWatermark2(new Watermark(61L));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new Watermark(-19L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "k1", Predef$.MODULE$.long2Long(2L), "k1"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "k1", Predef$.MODULE$.long2Long(2L), "k1"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(5L), "k1", Predef$.MODULE$.long2Long(2L), "k1"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(5L), "k1", Predef$.MODULE$.long2Long(15L), "k1"})), 0L));
        expectedOutput.add(new Watermark(0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(35L), "k1", Predef$.MODULE$.long2Long(15L), "k1"})), 0L));
        expectedOutput.add(new Watermark(18L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(40L), "k2", Predef$.MODULE$.long2Long(39L), "k2"})), 0L));
        expectedOutput.add(new Watermark(41L));
        ConcurrentLinkedQueue result = testHarness.getOutput();
        this.verifyWithWatermarks(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testRowTimeInnerJoinWithNegativeBounds() {
        RowTimeBoundedStreamJoin joinProcessFunc = new RowTimeBoundedStreamJoin(JoinType.INNER, -10L, -7L, 0L, this.rowType(), this.rowType(), "TestJoinFunction", this.funcCode(), 0, 0);
        KeyedCoProcessOperatorWithWatermarkDelay operator = new KeyedCoProcessOperatorWithWatermarkDelay((CoProcessFunction)joinProcessFunc, joinProcessFunc.getMaxOutputDelay());
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, new HarnessTestBase.TupleRowKeySelector(1), new HarnessTestBase.TupleRowKeySelector(1), Types$.MODULE$.STRING(), 1, 1, 0);
        testHarness.open();
        testHarness.processWatermark1(new Watermark(1L));
        testHarness.processWatermark2(new Watermark(1L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "k1"})), 0L));
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(2L));
        testHarness.processWatermark2(new Watermark(2L));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(3L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(3L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(13L), "k1"})), 0L));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6L), "k1"})), 0L));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(10L));
        testHarness.processWatermark2(new Watermark(10L));
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(18L));
        testHarness.processWatermark2(new Watermark(18L));
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new Watermark(-9L));
        expectedOutput.add(new Watermark(-8L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(3L), "k1", Predef$.MODULE$.long2Long(13L), "k1"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6L), "k1", Predef$.MODULE$.long2Long(13L), "k1"})), 0L));
        expectedOutput.add(new Watermark(0L));
        expectedOutput.add(new Watermark(8L));
        ConcurrentLinkedQueue result = testHarness.getOutput();
        this.verifyWithWatermarks(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testRowTimeLeftOuterJoin() {
        RowTimeBoundedStreamJoin joinProcessFunc = new RowTimeBoundedStreamJoin(JoinType.LEFT_OUTER, -5L, 9L, 0L, this.rowType(), this.rowType(), "TestJoinFunction", this.funcCode(), 0, 0);
        KeyedCoProcessOperatorWithWatermarkDelay operator = new KeyedCoProcessOperatorWithWatermarkDelay((CoProcessFunction)joinProcessFunc, joinProcessFunc.getMaxOutputDelay());
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, new HarnessTestBase.TupleRowKeySelector(1), new HarnessTestBase.TupleRowKeySelector(1), Types$.MODULE$.STRING(), 1, 1, 0);
        testHarness.open();
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "k2"})), 0L));
        Assert.assertEquals((long)2L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(14L));
        testHarness.processWatermark2(new Watermark(14L));
        Assert.assertEquals((long)1L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(18L));
        testHarness.processWatermark2(new Watermark(18L));
        Assert.assertEquals((long)0L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "k2"})), 0L));
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numEventTimeTimers());
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(19L), "k1"})), 0L));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(26L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(25L), "k1"})), 0L));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(21L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(39L), "k2"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(40L), "k2"})), 0L));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(50L), "k2"})), 0L));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(49L), "k2"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(41L), "k2"})), 0L));
        testHarness.processWatermark1(new Watermark(100L));
        testHarness.processWatermark2(new Watermark(100L));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "k1", null, null})), 14L));
        expectedOutput.add(new Watermark(5L));
        expectedOutput.add(new Watermark(9L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "k1", null, null})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20L), "k1", Predef$.MODULE$.long2Long(25L), "k1"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(21L), "k1", Predef$.MODULE$.long2Long(25L), "k1"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(21L), "k1", Predef$.MODULE$.long2Long(26L), "k1"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(49L), "k2", Predef$.MODULE$.long2Long(40L), "k2"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(49L), "k2", Predef$.MODULE$.long2Long(41L), "k2"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(50L), "k2", Predef$.MODULE$.long2Long(41L), "k2"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(19L), "k1", null, null})), 32L));
        expectedOutput.add(new Watermark(91L));
        ConcurrentLinkedQueue result = testHarness.getOutput();
        this.verifyWithWatermarks(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testRowTimeRightOuterJoin() {
        RowTimeBoundedStreamJoin joinProcessFunc = new RowTimeBoundedStreamJoin(JoinType.RIGHT_OUTER, -5L, 9L, 0L, this.rowType(), this.rowType(), "TestJoinFunction", this.funcCode(), 0, 0);
        KeyedCoProcessOperatorWithWatermarkDelay operator = new KeyedCoProcessOperatorWithWatermarkDelay((CoProcessFunction)joinProcessFunc, joinProcessFunc.getMaxOutputDelay());
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, new HarnessTestBase.TupleRowKeySelector(1), new HarnessTestBase.TupleRowKeySelector(1), Types$.MODULE$.STRING(), 1, 1, 0);
        testHarness.open();
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "k2"})), 0L));
        Assert.assertEquals((long)2L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(14L));
        testHarness.processWatermark2(new Watermark(14L));
        Assert.assertEquals((long)1L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(18L));
        testHarness.processWatermark2(new Watermark(18L));
        Assert.assertEquals((long)0L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "k2"})), 0L));
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numEventTimeTimers());
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(19L), "k1"})), 0L));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(26L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(25L), "k1"})), 0L));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(21L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(39L), "k2"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(40L), "k2"})), 0L));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(50L), "k2"})), 0L));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(49L), "k2"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(41L), "k2"})), 0L));
        testHarness.processWatermark1(new Watermark(100L));
        testHarness.processWatermark2(new Watermark(100L));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new Watermark(5L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.long2Long(1L), "k2"})), 18L));
        expectedOutput.add(new Watermark(9L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.long2Long(2L), "k2"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20L), "k1", Predef$.MODULE$.long2Long(25L), "k1"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(21L), "k1", Predef$.MODULE$.long2Long(25L), "k1"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(21L), "k1", Predef$.MODULE$.long2Long(26L), "k1"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(49L), "k2", Predef$.MODULE$.long2Long(40L), "k2"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(49L), "k2", Predef$.MODULE$.long2Long(41L), "k2"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(50L), "k2", Predef$.MODULE$.long2Long(41L), "k2"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.long2Long(39L), "k2"})), 56L));
        expectedOutput.add(new Watermark(91L));
        ConcurrentLinkedQueue result = testHarness.getOutput();
        this.verifyWithWatermarks(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testRowTimeFullOuterJoin() {
        RowTimeBoundedStreamJoin joinProcessFunc = new RowTimeBoundedStreamJoin(JoinType.FULL_OUTER, -5L, 9L, 0L, this.rowType(), this.rowType(), "TestJoinFunction", this.funcCode(), 0, 0);
        KeyedCoProcessOperatorWithWatermarkDelay operator = new KeyedCoProcessOperatorWithWatermarkDelay((CoProcessFunction)joinProcessFunc, joinProcessFunc.getMaxOutputDelay());
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, new HarnessTestBase.TupleRowKeySelector(1), new HarnessTestBase.TupleRowKeySelector(1), Types$.MODULE$.STRING(), 1, 1, 0);
        testHarness.open();
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "k2"})), 0L));
        Assert.assertEquals((long)2L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(14L));
        testHarness.processWatermark2(new Watermark(14L));
        Assert.assertEquals((long)1L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        testHarness.processWatermark1(new Watermark(18L));
        testHarness.processWatermark2(new Watermark(18L));
        Assert.assertEquals((long)0L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "k2"})), 0L));
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numEventTimeTimers());
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(19L), "k1"})), 0L));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(26L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(25L), "k1"})), 0L));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(21L), "k1"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(39L), "k2"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(40L), "k2"})), 0L));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(50L), "k2"})), 0L));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(49L), "k2"})), 0L));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(41L), "k2"})), 0L));
        testHarness.processWatermark1(new Watermark(100L));
        testHarness.processWatermark2(new Watermark(100L));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "k1", null, null})), 14L));
        expectedOutput.add(new Watermark(5L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.long2Long(1L), "k2"})), 18L));
        expectedOutput.add(new Watermark(9L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "k1", null, null})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.long2Long(2L), "k2"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20L), "k1", Predef$.MODULE$.long2Long(25L), "k1"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(21L), "k1", Predef$.MODULE$.long2Long(25L), "k1"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(21L), "k1", Predef$.MODULE$.long2Long(26L), "k1"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(49L), "k2", Predef$.MODULE$.long2Long(40L), "k2"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(49L), "k2", Predef$.MODULE$.long2Long(41L), "k2"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(50L), "k2", Predef$.MODULE$.long2Long(41L), "k2"})), 0L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(19L), "k1", null, null})), 32L));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.long2Long(39L), "k2"})), 56L));
        expectedOutput.add(new Watermark(91L));
        ConcurrentLinkedQueue result = testHarness.getOutput();
        this.verifyWithWatermarks(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testNonWindowInnerJoin() {
        CRowTypeInfo joinReturnType = CRowTypeInfo$.MODULE$.apply((TypeInformation)new RowTypeInfo((TypeInformation[])((Object[])new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}), (String[])((Object[])new String[]{"a", "b", "c", "d"})));
        NonWindowInnerJoin joinProcessFunc = new NonWindowInnerJoin(this.rowType(), this.rowType(), "TestJoinFunction", this.funcCode(), (StreamQueryConfig)this.queryConfig());
        KeyedCoProcessOperator operator = new KeyedCoProcessOperator((CoProcessFunction)joinProcessFunc);
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        Assert.assertEquals((long)1L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(2L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(3L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        Assert.assertEquals((long)6L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)3L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(4L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "Hello1"}))));
        Assert.assertEquals((long)8L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(5L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        Assert.assertEquals((long)6L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)3L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(6L);
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(8L);
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)1L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(10L);
        Assert.assertTrue((testHarness.numKeyedStateEntries() > 0 ? 1 : 0) != 0);
        testHarness.setProcessingTime(11L);
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numProcessingTimeTimers());
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "bbb", Predef$.MODULE$.int2Integer(2), "Hello1"}))));
        this.verify(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testNonWindowInnerJoinWithRetract() {
        NonWindowInnerJoin joinProcessFunc = new NonWindowInnerJoin(this.rowType(), this.rowType(), "TestJoinFunction", this.funcCode(), (StreamQueryConfig)this.queryConfig());
        KeyedCoProcessOperator operator = new KeyedCoProcessOperator((CoProcessFunction)joinProcessFunc);
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        Assert.assertEquals((long)1L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(2L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(3L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        Assert.assertEquals((long)5L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)3L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(4L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "Hello1"}))));
        Assert.assertEquals((long)7L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)testHarness.numProcessingTimeTimers());
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        testHarness.setProcessingTime(5L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        Assert.assertEquals((long)5L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)3L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(6L);
        Assert.assertEquals((long)3L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(8L);
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numProcessingTimeTimers());
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "bbb", Predef$.MODULE$.int2Integer(2), "Hello1"}))));
        this.verify(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testNonWindowLeftJoinWithoutNonEqualPred() {
        NonWindowLeftRightJoin joinProcessFunc = new NonWindowLeftRightJoin(this.rowType(), this.rowType(), "TestJoinFunction", this.funcCode(), true, (StreamQueryConfig)this.queryConfig());
        KeyedCoProcessOperator operator = new KeyedCoProcessOperator((CoProcessFunction)joinProcessFunc);
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        Assert.assertEquals((long)1L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(2L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(3L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        Assert.assertEquals((long)5L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)3L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(4L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "Hello1"}))));
        Assert.assertEquals((long)7L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)testHarness.numProcessingTimeTimers());
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        testHarness.setProcessingTime(5L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        Assert.assertEquals((long)5L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)3L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(6L);
        Assert.assertEquals((long)3L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(8L);
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numProcessingTimeTimers());
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "bbb", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "bbb", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "bbb", Predef$.MODULE$.int2Integer(2), "Hello1"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        this.verify(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testNonWindowLeftJoinWithNonEqualPred() {
        NonWindowLeftRightJoinWithNonEquiPredicates joinProcessFunc = new NonWindowLeftRightJoinWithNonEquiPredicates(this.rowType(), this.rowType(), "TestJoinFunction", this.funcCodeWithNonEqualPred(), true, (StreamQueryConfig)this.queryConfig());
        KeyedCoProcessOperator operator = new KeyedCoProcessOperator((CoProcessFunction)joinProcessFunc);
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb"}))));
        Assert.assertEquals((long)1L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)3L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(2L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)6L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(3L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb"}))));
        Assert.assertEquals((long)8L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)3L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(4L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "ccc"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "Hello"}))));
        Assert.assertEquals((long)10L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)testHarness.numProcessingTimeTimers());
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        testHarness.setProcessingTime(5L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi3"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi3"}))));
        Assert.assertEquals((long)6L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)3L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(6L);
        Assert.assertEquals((long)3L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(8L);
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numProcessingTimeTimers());
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "bbb", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "bbb", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "bbb", Predef$.MODULE$.int2Integer(2), "Hello"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        this.verify(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testNonWindowRightJoinWithoutNonEqualPred() {
        NonWindowLeftRightJoin joinProcessFunc = new NonWindowLeftRightJoin(this.rowType(), this.rowType(), "TestJoinFunction", this.funcCode(), false, (StreamQueryConfig)this.queryConfig());
        KeyedCoProcessOperator operator = new KeyedCoProcessOperator((CoProcessFunction)joinProcessFunc);
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        Assert.assertEquals((long)1L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(2L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(3L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        Assert.assertEquals((long)5L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)3L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(4L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "Hello1"}))));
        Assert.assertEquals((long)7L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)testHarness.numProcessingTimeTimers());
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        testHarness.setProcessingTime(5L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        Assert.assertEquals((long)5L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)3L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(6L);
        Assert.assertEquals((long)3L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(8L);
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numProcessingTimeTimers());
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1", Predef$.MODULE$.int2Integer(1), "aaa"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1", Predef$.MODULE$.int2Integer(1), "aaa"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "Hello1", Predef$.MODULE$.int2Integer(2), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        this.verify(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testNonWindowRightJoinWithNonEqualPred() {
        NonWindowLeftRightJoinWithNonEquiPredicates joinProcessFunc = new NonWindowLeftRightJoinWithNonEquiPredicates(this.rowType(), this.rowType(), "TestJoinFunction", this.funcCodeWithNonEqualPred2(), false, (StreamQueryConfig)this.queryConfig());
        KeyedCoProcessOperator operator = new KeyedCoProcessOperator((CoProcessFunction)joinProcessFunc);
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb"}))));
        Assert.assertEquals((long)1L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)3L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(2L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)6L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(3L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb"}))));
        Assert.assertEquals((long)8L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)3L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(4L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "ccc"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "Hello"}))));
        Assert.assertEquals((long)10L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)testHarness.numProcessingTimeTimers());
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        testHarness.setProcessingTime(5L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi3"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi3"}))));
        Assert.assertEquals((long)6L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)3L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(6L);
        Assert.assertEquals((long)3L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        testHarness.setProcessingTime(8L);
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numProcessingTimeTimers());
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1", Predef$.MODULE$.int2Integer(1), "aaa"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1", Predef$.MODULE$.int2Integer(1), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "Hello", Predef$.MODULE$.int2Integer(2), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1", Predef$.MODULE$.int2Integer(1), "aaa"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2", Predef$.MODULE$.int2Integer(1), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2", Predef$.MODULE$.int2Integer(1), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1", Predef$.MODULE$.int2Integer(1), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "bbb"}))));
        this.verify(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testNonWindowFullJoinWithoutNonEqualPred() {
        NonWindowFullJoin joinProcessFunc = new NonWindowFullJoin(this.rowType(), this.rowType(), "TestJoinFunction", this.funcCode(), (StreamQueryConfig)this.queryConfig());
        KeyedCoProcessOperator operator = new KeyedCoProcessOperator((CoProcessFunction)joinProcessFunc);
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "ccc"}))));
        Assert.assertEquals((long)1L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(2L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "ccc"}))));
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(3L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "aaa"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "ddd"}))));
        Assert.assertEquals((long)3L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)6L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "ddd"}))));
        Assert.assertEquals((long)4L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)8L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(4L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "aaa"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "ddd"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "ddd"}))));
        Assert.assertEquals((long)4L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)6L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(5L);
        Assert.assertEquals((long)3L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(6L);
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(7L);
        Assert.assertEquals((long)0L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(8L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "ccc"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "ccc"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.int2Integer(2), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.int2Integer(2), "ccc"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "ddd", Predef$.MODULE$.int2Integer(2), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "ddd", Predef$.MODULE$.int2Integer(2), "ccc"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "aaa"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", Predef$.MODULE$.int2Integer(1), "aaa"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "ddd"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", Predef$.MODULE$.int2Integer(1), "ddd"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.int2Integer(2), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.int2Integer(2), "ccc"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "ddd", Predef$.MODULE$.int2Integer(2), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "ddd", Predef$.MODULE$.int2Integer(2), "ccc"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "ccc"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "aaa"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", Predef$.MODULE$.int2Integer(1), "aaa"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "ddd"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", Predef$.MODULE$.int2Integer(1), "ddd"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        this.verify(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testNonWindowFullJoinWithNonEqualPred() {
        NonWindowFullJoinWithNonEquiPredicates joinProcessFunc = new NonWindowFullJoinWithNonEquiPredicates(this.rowType(), this.rowType(), "TestJoinFunction", this.funcCodeWithNonEqualPred2(), (StreamQueryConfig)this.queryConfig());
        KeyedCoProcessOperator operator = new KeyedCoProcessOperator((CoProcessFunction)joinProcessFunc);
        KeyedTwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "ccc"}))));
        Assert.assertEquals((long)1L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)3L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(2L);
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "ccc"}))));
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)6L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(3L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "aaa"}))));
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "ddd"}))));
        Assert.assertEquals((long)3L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)9L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "ddd"}))));
        Assert.assertEquals((long)4L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)12L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(4L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "aaa"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "ddd"}))));
        Assert.assertEquals((long)4L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)12L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(5L);
        Assert.assertEquals((long)3L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)9L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(6L);
        Assert.assertEquals((long)2L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)6L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(7L);
        Assert.assertEquals((long)0L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.setProcessingTime(8L);
        testHarness.processElement1(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb"}))));
        testHarness.processElement2(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "ccc"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "ccc"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.int2Integer(2), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.int2Integer(2), "ccc"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "ddd", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "ddd"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", Predef$.MODULE$.int2Integer(1), "ddd"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.int2Integer(2), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.int2Integer(2), "ccc"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "ccc"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "ddd"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply(false, (Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", Predef$.MODULE$.int2Integer(1), "ddd"}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        this.verify(expectedOutput, result);
        testHarness.close();
    }
}

