package org.apache.flink.table.runtime.harness;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.join.JoinType;
import org.apache.flink.streaming.api.operators.co.LegacyKeyedCoProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.runtime.harness.HarnessTestBase;
import org.apache.flink.table.runtime.join.NonWindowFullJoin;
import org.apache.flink.table.runtime.join.NonWindowFullJoinWithNonEquiPredicates;
import org.apache.flink.table.runtime.join.NonWindowInnerJoin;
import org.apache.flink.table.runtime.join.NonWindowLeftRightJoin;
import org.apache.flink.table.runtime.join.NonWindowLeftRightJoinWithNonEquiPredicates;
import org.apache.flink.table.runtime.join.ProcTimeBoundedStreamJoin;
import org.apache.flink.table.runtime.join.RowTimeBoundedStreamJoin;
import org.apache.flink.table.runtime.operators.KeyedCoProcessOperatorWithWatermarkDelay;
import org.apache.flink.table.runtime.types.CRow$;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Test;
import scala.Predef$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;

/* compiled from: JoinHarnessTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u00055b\u0001B\u0001\u0003\u0001=\u0011qBS8j]\"\u000b'O\\3tgR+7\u000f\u001e\u0006\u0003\u0007\u0011\tq\u0001[1s]\u0016\u001c8O\u0003\u0002\u0006\r\u00059!/\u001e8uS6,'BA\u0004\t\u0003\u0015!\u0018M\u00197f\u0015\tI!\"A\u0003gY&t7N\u0003\u0002\f\u0019\u00051\u0011\r]1dQ\u0016T\u0011!D\u0001\u0004_J<7\u0001A\n\u0003\u0001A\u0001\"!\u0005\n\u000e\u0003\tI!a\u0005\u0002\u0003\u001f!\u000b'O\\3tgR+7\u000f\u001e\"bg\u0016DQ!\u0006\u0001\u0005\u0002Y\ta\u0001P5oSRtD#A\f\u0011\u0005E\u0001\u0001bB\r\u0001\u0005\u0004%IAG\u0001\u0007G>tg-[4\u0016\u0003m\u0001\"\u0001\b\u0018\u000f\u0005uacB\u0001\u0010,\u001d\ty\"F\u0004\u0002!S9\u0011\u0011\u0005\u000b\b\u0003E\u001dr!a\t\u0014\u000e\u0003\u0011R!!\n\b\u0002\rq\u0012xn\u001c;?\u0013\u0005i\u0011BA\u0006\r\u0013\tI!\"\u0003\u0002\b\u0011%\u0011QAB\u0005\u0003\u0007\u0011I!!\f\u0002\u0002\u001f!\u000b'O\\3tgR+7\u000f\u001e\"bg\u0016L!a\f\u0019\u0003\u001fQ+7\u000f\u001e+bE2,7i\u001c8gS\u001eT!!\f\u0002\t\rI\u0002\u0001\u0015!\u0003\u001c\u0003\u001d\u0019wN\u001c4jO\u0002Bq\u0001\u000e\u0001C\u0002\u0013%Q'A\u0004s_^$\u0016\u0010]3\u0016\u0003Y\u00022a\u000e A\u001b\u0005A$BA\u001d;\u0003!!\u0018\u0010]3j]\u001a|'BA\u001e=\u0003\u0019\u0019w.\\7p]*\u0011Q\bC\u0001\u0004CBL\u0017BA 9\u0005=!\u0016\u0010]3J]\u001a|'/\\1uS>t\u0007CA!E\u001b\u0005\u0011%BA\"\t\u0003\u0015!\u0018\u0010]3t\u0013\t)%IA\u0002S_^Daa\u0012\u0001!\u0002\u00131\u0014\u0001\u0003:poRK\b/\u001a\u0011\t\u000f%\u0003!\u0019!C\u0001\u0015\u0006Aa-\u001e8d\u0007>$W-F\u0001L!\ta%K\u0004\u0002N!6\taJC\u0001P\u0003\u0015\u00198-\u00197b\u0013\t\tf*\u0001\u0004Qe\u0016$WMZ\u0005\u0003'R\u0013aa\u0015;sS:<'BA)O\u0011\u00191\u0006\u0001)A\u0005\u0017\u0006Ia-\u001e8d\u0007>$W\r\t\u0005\b1\u0002\u0011\r\u0011\"\u0001K\u0003a1WO\\2D_\u0012,w+\u001b;i\u001d>tW)];bYB\u0013X\r\u001a\u0005\u00075\u0002\u0001\u000b\u0011B&\u00023\u0019,hnY\"pI\u0016<\u0016\u000e\u001e5O_:,\u0015/^1m!J,G\r\t\u0005\b9\u0002\u0011\r\u0011\"\u0001K\u0003e1WO\\2D_\u0012,w+\u001b;i\u001d>tW)];bYB\u0013X\r\u001a\u001a\t\ry\u0003\u0001\u0015!\u0003L\u0003i1WO\\2D_\u0012,w+\u001b;i\u001d>tW)];bYB\u0013X\r\u001a\u001a!\u0011\u0015\u0001\u0007\u0001\"\u0001b\u0003\u0015\"Xm\u001d;Qe>\u001cG+[7f\u0013:tWM\u001d&pS:<\u0016\u000e\u001e5D_6lwN\u001c\"pk:$7\u000fF\u0001c!\ti5-\u0003\u0002e\u001d\n!QK\\5uQ\tyf\r\u0005\u0002hU6\t\u0001N\u0003\u0002j\u0019\u0005)!.\u001e8ji&\u00111\u000e\u001b\u0002\u0005)\u0016\u001cH\u000fC\u0003n\u0001\u0011\u0005\u0011-A\u0014uKN$\bK]8d)&lW-\u00138oKJTu.\u001b8XSRDg*Z4bi&4XMQ8v]\u0012\u001c\bF\u00017g\u0011\u0015\u0001\b\u0001\"\u0001b\u0003\u0011\"Xm\u001d;S_^$\u0016.\\3J]:,'OS8j]^KG\u000f[\"p[6|gNQ8v]\u0012\u001c\bFA8g\u0011\u0015\u0019\b\u0001\"\u0001b\u0003\u0019\"Xm\u001d;S_^$\u0016.\\3J]:,'OS8j]^KG\u000f\u001b(fO\u0006$\u0018N^3C_VtGm\u001d\u0015\u0003e\u001aDQA\u001e\u0001\u0005\u0002\u0005\f\u0001\u0004^3tiJ{w\u000fV5nK2+g\r^(vi\u0016\u0014(j\\5oQ\t)h\rC\u0003z\u0001\u0011\u0005\u0011-A\ruKN$(k\\<US6,'+[4ii>+H/\u001a:K_&t\u0007F\u0001=g\u0011\u0015a\b\u0001\"\u0001b\u0003a!Xm\u001d;S_^$\u0016.\\3Gk2dw*\u001e;fe*{\u0017N\u001c\u0015\u0003w\u001aDQa \u0001\u0005\u0002\u0005\fa\u0003^3ti:{gnV5oI><\u0018J\u001c8fe*{\u0017N\u001c\u0015\u0003}\u001aDa!!\u0002\u0001\t\u0003\t\u0017!\t;fgRtuN\\,j]\u0012|w/\u00138oKJTu.\u001b8XSRD'+\u001a;sC\u000e$\bfAA\u0002M\"1\u00111\u0002\u0001\u0005\u0002\u0005\f\u0001\u0006^3ti:{gnV5oI><H*\u001a4u\u0015>LgnV5uQ>,HOT8o\u000bF,\u0018\r\u001c)sK\u0012D3!!\u0003g\u0011\u0019\t\t\u0002\u0001C\u0001C\u0006)C/Z:u\u001d>tw+\u001b8e_^dUM\u001a;K_&tw+\u001b;i\u001d>tW)];bYB\u0013X\r\u001a\u0015\u0004\u0003\u001f1\u0007BBA\f\u0001\u0011\u0005\u0011-A\u0015uKN$hj\u001c8XS:$wn\u001e*jO\"$(j\\5o/&$\bn\\;u\u001d>tW)];bYB\u0013X\r\u001a\u0015\u0004\u0003+1\u0007BBA\u000f\u0001\u0011\u0005\u0011-\u0001\u0014uKN$hj\u001c8XS:$wn\u001e*jO\"$(j\\5o/&$\bNT8o\u000bF,\u0018\r\u001c)sK\u0012D3!a\u0007g\u0011\u0019\t\u0019\u0003\u0001C\u0001C\u0006AC/Z:u\u001d>tw+\u001b8e_^4U\u000f\u001c7K_&tw+\u001b;i_V$hj\u001c8FcV\fG\u000e\u0015:fI\"\u001a\u0011\u0011\u00054\t\r\u0005%\u0002\u0001\"\u0001b\u0003\u0015\"Xm\u001d;O_:<\u0016N\u001c3po\u001a+H\u000e\u001c&pS:<\u0016\u000e\u001e5O_:,\u0015/^1m!J,G\rK\u0002\u0002(\u0019\u0004")
/* loaded from: input_file:org/apache/flink/table/runtime/harness/JoinHarnessTest.class */
public class JoinHarnessTest extends HarnessTestBase {
    private final HarnessTestBase.TestTableConfig config = new HarnessTestBase.TestTableConfig();
    private final TypeInformation<Row> rowType;
    private final String funcCode;
    private final String funcCodeWithNonEqualPred;
    private final String funcCodeWithNonEqualPred2;

    private HarnessTestBase.TestTableConfig config() {
        return this.config;
    }

    private TypeInformation<Row> rowType() {
        return this.rowType;
    }

    public String funcCode() {
        return this.funcCode;
    }

    public String funcCodeWithNonEqualPred() {
        return this.funcCodeWithNonEqualPred;
    }

    public String funcCodeWithNonEqualPred2() {
        return this.funcCodeWithNonEqualPred2;
    }

    @Test
    public void testProcTimeInnerJoinWithCommonBounds() {
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new LegacyKeyedCoProcessOperator(new ProcTimeBoundedStreamJoin(JoinType.INNER, -10L, 20L, rowType(), rowType(), "TestJoinFunction", funcCode())), new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), Types.INT(), 1, 1, 0);
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(1L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "1a1"})), 1L));
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(2L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "2a2"})), 2L));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(3L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "1a3"})), 3L));
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "1b3"})), 3L));
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(4L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "2b4"})), 4L));
        Assert.assertEquals(8L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(13L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "1b13"})), 13L));
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(33L);
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "1a33"})), 33L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "2a33"})), 33L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "2b33"})), 33L));
        Queue<Object> output = keyedTwoInputStreamOperatorTestHarness.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "1a1", Predef$.MODULE$.long2Long(1L), "1b3"})), 3L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "1a3", Predef$.MODULE$.long2Long(1L), "1b3"})), 3L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "2a2", Predef$.MODULE$.long2Long(2L), "2b4"})), 4L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "1a3", Predef$.MODULE$.long2Long(1L), "1b13"})), 13L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "1a33", Predef$.MODULE$.long2Long(1L), "1b13"})), 33L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "2a33", Predef$.MODULE$.long2Long(2L), "2b33"})), 33L));
        verify(concurrentLinkedQueue, output);
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testProcTimeInnerJoinWithNegativeBounds() {
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new LegacyKeyedCoProcessOperator(new ProcTimeBoundedStreamJoin(JoinType.INNER, -10L, -5L, rowType(), rowType(), "TestJoinFunction", funcCode())), new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), Types.INT(), 1, 1, 0);
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(1L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "1a1"})), 1L));
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(2L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "2a2"})), 2L));
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(3L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "1a3"})), 3L));
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "1b3"})), 3L));
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(7L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "2b7"})), 7L));
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(12L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "1b12"})), 12L));
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(13L);
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(14L);
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(16L);
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Queue<Object> output = keyedTwoInputStreamOperatorTestHarness.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "2a2", Predef$.MODULE$.long2Long(2L), "2b7"})), 7L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "1a3", Predef$.MODULE$.long2Long(1L), "1b12"})), 12L));
        verify(concurrentLinkedQueue, output);
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testRowTimeInnerJoinWithCommonBounds() {
        RowTimeBoundedStreamJoin rowTimeBoundedStreamJoin = new RowTimeBoundedStreamJoin(JoinType.INNER, -10L, 20L, 0L, rowType(), rowType(), "TestJoinFunction", funcCode(), 0, 0);
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperatorWithWatermarkDelay(rowTimeBoundedStreamJoin, rowTimeBoundedStreamJoin.getMaxOutputDelay()), new HarnessTestBase.TupleRowKeySelector(1), new HarnessTestBase.TupleRowKeySelector(1), Types.STRING(), 1, 1, 0);
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(1L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(1L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "k1"})), 0L));
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numEventTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "k1"})), 0L));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numEventTimeTimers());
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(5L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(15L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(20L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(20L));
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(35L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(38L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(38L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(40L), "k2"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(39L), "k2"})), 0L));
        Assert.assertEquals(6L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(61L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(61L));
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new Watermark(-19L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "k1", Predef$.MODULE$.long2Long(2L), "k1"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "k1", Predef$.MODULE$.long2Long(2L), "k1"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(5L), "k1", Predef$.MODULE$.long2Long(2L), "k1"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(5L), "k1", Predef$.MODULE$.long2Long(15L), "k1"})), 0L));
        concurrentLinkedQueue.add(new Watermark(0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(35L), "k1", Predef$.MODULE$.long2Long(15L), "k1"})), 0L));
        concurrentLinkedQueue.add(new Watermark(18L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(40L), "k2", Predef$.MODULE$.long2Long(39L), "k2"})), 0L));
        concurrentLinkedQueue.add(new Watermark(41L));
        verifyWithWatermarks(concurrentLinkedQueue, keyedTwoInputStreamOperatorTestHarness.getOutput());
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testRowTimeInnerJoinWithNegativeBounds() {
        RowTimeBoundedStreamJoin rowTimeBoundedStreamJoin = new RowTimeBoundedStreamJoin(JoinType.INNER, -10L, -7L, 0L, rowType(), rowType(), "TestJoinFunction", funcCode(), 0, 0);
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperatorWithWatermarkDelay(rowTimeBoundedStreamJoin, rowTimeBoundedStreamJoin.getMaxOutputDelay()), new HarnessTestBase.TupleRowKeySelector(1), new HarnessTestBase.TupleRowKeySelector(1), Types.STRING(), 1, 1, 0);
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(1L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(1L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "k1"})), 0L));
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(2L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(2L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(3L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(3L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(13L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6L), "k1"})), 0L));
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(10L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(10L));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(18L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(18L));
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new Watermark(-9L));
        concurrentLinkedQueue.add(new Watermark(-8L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(3L), "k1", Predef$.MODULE$.long2Long(13L), "k1"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6L), "k1", Predef$.MODULE$.long2Long(13L), "k1"})), 0L));
        concurrentLinkedQueue.add(new Watermark(0L));
        concurrentLinkedQueue.add(new Watermark(8L));
        verifyWithWatermarks(concurrentLinkedQueue, keyedTwoInputStreamOperatorTestHarness.getOutput());
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testRowTimeLeftOuterJoin() {
        RowTimeBoundedStreamJoin rowTimeBoundedStreamJoin = new RowTimeBoundedStreamJoin(JoinType.LEFT_OUTER, -5L, 9L, 0L, rowType(), rowType(), "TestJoinFunction", funcCode(), 0, 0);
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperatorWithWatermarkDelay(rowTimeBoundedStreamJoin, rowTimeBoundedStreamJoin.getMaxOutputDelay()), new HarnessTestBase.TupleRowKeySelector(1), new HarnessTestBase.TupleRowKeySelector(1), Types.STRING(), 1, 1, 0);
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "k2"})), 0L));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numEventTimeTimers());
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(14L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(14L));
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numEventTimeTimers());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(18L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(18L));
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numEventTimeTimers());
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "k2"})), 0L));
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numEventTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(19L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(26L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(25L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(21L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(39L), "k2"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(40L), "k2"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(50L), "k2"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(49L), "k2"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(41L), "k2"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(100L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(100L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "k1", null, null})), 14L));
        concurrentLinkedQueue.add(new Watermark(5L));
        concurrentLinkedQueue.add(new Watermark(9L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "k1", null, null})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20L), "k1", Predef$.MODULE$.long2Long(25L), "k1"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(21L), "k1", Predef$.MODULE$.long2Long(25L), "k1"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(21L), "k1", Predef$.MODULE$.long2Long(26L), "k1"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(49L), "k2", Predef$.MODULE$.long2Long(40L), "k2"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(49L), "k2", Predef$.MODULE$.long2Long(41L), "k2"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(50L), "k2", Predef$.MODULE$.long2Long(41L), "k2"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(19L), "k1", null, null})), 32L));
        concurrentLinkedQueue.add(new Watermark(91L));
        verifyWithWatermarks(concurrentLinkedQueue, keyedTwoInputStreamOperatorTestHarness.getOutput());
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testRowTimeRightOuterJoin() {
        RowTimeBoundedStreamJoin rowTimeBoundedStreamJoin = new RowTimeBoundedStreamJoin(JoinType.RIGHT_OUTER, -5L, 9L, 0L, rowType(), rowType(), "TestJoinFunction", funcCode(), 0, 0);
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperatorWithWatermarkDelay(rowTimeBoundedStreamJoin, rowTimeBoundedStreamJoin.getMaxOutputDelay()), new HarnessTestBase.TupleRowKeySelector(1), new HarnessTestBase.TupleRowKeySelector(1), Types.STRING(), 1, 1, 0);
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "k2"})), 0L));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numEventTimeTimers());
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(14L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(14L));
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numEventTimeTimers());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(18L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(18L));
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numEventTimeTimers());
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "k2"})), 0L));
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numEventTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(19L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(26L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(25L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(21L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(39L), "k2"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(40L), "k2"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(50L), "k2"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(49L), "k2"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(41L), "k2"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(100L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(100L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new Watermark(5L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.long2Long(1L), "k2"})), 18L));
        concurrentLinkedQueue.add(new Watermark(9L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.long2Long(2L), "k2"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20L), "k1", Predef$.MODULE$.long2Long(25L), "k1"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(21L), "k1", Predef$.MODULE$.long2Long(25L), "k1"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(21L), "k1", Predef$.MODULE$.long2Long(26L), "k1"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(49L), "k2", Predef$.MODULE$.long2Long(40L), "k2"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(49L), "k2", Predef$.MODULE$.long2Long(41L), "k2"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(50L), "k2", Predef$.MODULE$.long2Long(41L), "k2"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.long2Long(39L), "k2"})), 56L));
        concurrentLinkedQueue.add(new Watermark(91L));
        verifyWithWatermarks(concurrentLinkedQueue, keyedTwoInputStreamOperatorTestHarness.getOutput());
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testRowTimeFullOuterJoin() {
        RowTimeBoundedStreamJoin rowTimeBoundedStreamJoin = new RowTimeBoundedStreamJoin(JoinType.FULL_OUTER, -5L, 9L, 0L, rowType(), rowType(), "TestJoinFunction", funcCode(), 0, 0);
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperatorWithWatermarkDelay(rowTimeBoundedStreamJoin, rowTimeBoundedStreamJoin.getMaxOutputDelay()), new HarnessTestBase.TupleRowKeySelector(1), new HarnessTestBase.TupleRowKeySelector(1), Types.STRING(), 1, 1, 0);
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "k2"})), 0L));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numEventTimeTimers());
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(14L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(14L));
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numEventTimeTimers());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(18L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(18L));
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numEventTimeTimers());
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "k2"})), 0L));
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numEventTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(19L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(26L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(25L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(21L), "k1"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(39L), "k2"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(40L), "k2"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(50L), "k2"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(49L), "k2"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(41L), "k2"})), 0L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark1(new Watermark(100L));
        keyedTwoInputStreamOperatorTestHarness.processWatermark2(new Watermark(100L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "k1", null, null})), 14L));
        concurrentLinkedQueue.add(new Watermark(5L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.long2Long(1L), "k2"})), 18L));
        concurrentLinkedQueue.add(new Watermark(9L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "k1", null, null})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.long2Long(2L), "k2"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20L), "k1", Predef$.MODULE$.long2Long(25L), "k1"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(21L), "k1", Predef$.MODULE$.long2Long(25L), "k1"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(21L), "k1", Predef$.MODULE$.long2Long(26L), "k1"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(49L), "k2", Predef$.MODULE$.long2Long(40L), "k2"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(49L), "k2", Predef$.MODULE$.long2Long(41L), "k2"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(50L), "k2", Predef$.MODULE$.long2Long(41L), "k2"})), 0L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(19L), "k1", null, null})), 32L));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.long2Long(39L), "k2"})), 56L));
        concurrentLinkedQueue.add(new Watermark(91L));
        verifyWithWatermarks(concurrentLinkedQueue, keyedTwoInputStreamOperatorTestHarness.getOutput());
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testNonWindowInnerJoin() {
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new LegacyKeyedCoProcessOperator(new NonWindowInnerJoin(rowType(), rowType(), "TestJoinFunction", funcCode(), config().getMinIdleStateRetentionTime(), config().getMaxIdleStateRetentionTime())), new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(1L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(2L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(3L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        Assert.assertEquals(5L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(4L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "Hello1"}))));
        Assert.assertEquals(6L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(5L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        Assert.assertEquals(5L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(6L);
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(9L);
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Queue<Object> output = keyedTwoInputStreamOperatorTestHarness.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "bbb", Predef$.MODULE$.int2Integer(2), "Hello1"}))));
        verify(concurrentLinkedQueue, output);
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testNonWindowInnerJoinWithRetract() {
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new LegacyKeyedCoProcessOperator(new NonWindowInnerJoin(rowType(), rowType(), "TestJoinFunction", funcCode(), config().getMinIdleStateRetentionTime(), config().getMaxIdleStateRetentionTime())), new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(1L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(2L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(3L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(4L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "Hello1"}))));
        Assert.assertEquals(5L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(5L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(6L);
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(9L);
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Queue<Object> output = keyedTwoInputStreamOperatorTestHarness.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "bbb", Predef$.MODULE$.int2Integer(2), "Hello1"}))));
        verify(concurrentLinkedQueue, output);
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testNonWindowLeftJoinWithoutNonEqualPred() {
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new LegacyKeyedCoProcessOperator(new NonWindowLeftRightJoin(rowType(), rowType(), "TestJoinFunction", funcCode(), true, config().getMinIdleStateRetentionTime(), config().getMaxIdleStateRetentionTime())), new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(1L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(2L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(3L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(4L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "Hello1"}))));
        Assert.assertEquals(5L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(5L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(6L);
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(9L);
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Queue<Object> output = keyedTwoInputStreamOperatorTestHarness.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "bbb", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "bbb", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "bbb", Predef$.MODULE$.int2Integer(2), "Hello1"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        verify(concurrentLinkedQueue, output);
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testNonWindowLeftJoinWithNonEqualPred() {
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new LegacyKeyedCoProcessOperator(new NonWindowLeftRightJoinWithNonEquiPredicates(rowType(), rowType(), "TestJoinFunction", funcCodeWithNonEqualPred(), true, config().getMinIdleStateRetentionTime(), config().getMaxIdleStateRetentionTime())), new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(1L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb"}))));
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(3L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(2L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(6L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(3L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb"}))));
        Assert.assertEquals(7L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(4L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "ccc"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "Hello"}))));
        Assert.assertEquals(8L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        Assert.assertEquals(7L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(5L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi3"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi3"}))));
        Assert.assertEquals(7L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(6L);
        Assert.assertEquals(3L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(8L);
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Queue<Object> output = keyedTwoInputStreamOperatorTestHarness.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "bbb", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "bbb", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "bbb", Predef$.MODULE$.int2Integer(2), "Hello"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "Hi3"}))));
        verify(concurrentLinkedQueue, output);
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testNonWindowRightJoinWithoutNonEqualPred() {
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new LegacyKeyedCoProcessOperator(new NonWindowLeftRightJoin(rowType(), rowType(), "TestJoinFunction", funcCode(), false, config().getMinIdleStateRetentionTime(), config().getMaxIdleStateRetentionTime())), new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(1L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(2L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(3L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(4L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "Hello1"}))));
        Assert.assertEquals(5L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(5L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(6L);
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(9L);
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Queue<Object> output = keyedTwoInputStreamOperatorTestHarness.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1", Predef$.MODULE$.int2Integer(1), "aaa"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1", Predef$.MODULE$.int2Integer(1), "aaa"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "Hello1", Predef$.MODULE$.int2Integer(2), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        verify(concurrentLinkedQueue, output);
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testNonWindowRightJoinWithNonEqualPred() {
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new LegacyKeyedCoProcessOperator(new NonWindowLeftRightJoinWithNonEquiPredicates(rowType(), rowType(), "TestJoinFunction", funcCodeWithNonEqualPred2(), false, config().getMinIdleStateRetentionTime(), config().getMaxIdleStateRetentionTime())), new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(1L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb"}))));
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(3L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(2L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(6L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(3L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb"}))));
        Assert.assertEquals(7L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(4L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "ccc"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "Hello"}))));
        Assert.assertEquals(8L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1"}))));
        Assert.assertEquals(7L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(5L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi3"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi3"}))));
        Assert.assertEquals(7L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(6L);
        Assert.assertEquals(3L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(8L);
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Queue<Object> output = keyedTwoInputStreamOperatorTestHarness.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1", Predef$.MODULE$.int2Integer(1), "aaa"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1", Predef$.MODULE$.int2Integer(1), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "Hello", Predef$.MODULE$.int2Integer(2), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1", Predef$.MODULE$.int2Integer(1), "aaa"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2", Predef$.MODULE$.int2Integer(1), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi2", Predef$.MODULE$.int2Integer(1), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi1", Predef$.MODULE$.int2Integer(1), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "Hi3", Predef$.MODULE$.int2Integer(1), "bbb"}))));
        verify(concurrentLinkedQueue, output);
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testNonWindowFullJoinWithoutNonEqualPred() {
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new LegacyKeyedCoProcessOperator(new NonWindowFullJoin(rowType(), rowType(), "TestJoinFunction", funcCode(), config().getMinIdleStateRetentionTime(), config().getMaxIdleStateRetentionTime())), new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(1L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "ccc"}))));
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(2L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "ccc"}))));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(3L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "ddd"}))));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(5L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "ddd"}))));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(6L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(4L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "ddd"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "ddd"}))));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(5L);
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(4L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(6L);
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(7L);
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(8L);
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        Queue<Object> output = keyedTwoInputStreamOperatorTestHarness.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "ccc"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "ccc"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.int2Integer(2), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.int2Integer(2), "ccc"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "ddd", Predef$.MODULE$.int2Integer(2), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "ddd", Predef$.MODULE$.int2Integer(2), "ccc"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "aaa"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", Predef$.MODULE$.int2Integer(1), "aaa"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "ddd"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", Predef$.MODULE$.int2Integer(1), "ddd"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.int2Integer(2), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.int2Integer(2), "ccc"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "ddd", Predef$.MODULE$.int2Integer(2), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "ddd", Predef$.MODULE$.int2Integer(2), "ccc"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "ccc"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "aaa"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", Predef$.MODULE$.int2Integer(1), "aaa"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "ddd"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", Predef$.MODULE$.int2Integer(1), "ddd"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        verify(concurrentLinkedQueue, output);
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testNonWindowFullJoinWithNonEqualPred() {
        KeyedTwoInputStreamOperatorTestHarness keyedTwoInputStreamOperatorTestHarness = new KeyedTwoInputStreamOperatorTestHarness(new LegacyKeyedCoProcessOperator(new NonWindowFullJoinWithNonEquiPredicates(rowType(), rowType(), "TestJoinFunction", funcCodeWithNonEqualPred2(), config().getMinIdleStateRetentionTime(), config().getMaxIdleStateRetentionTime())), new HarnessTestBase.TupleRowKeySelector(0), new HarnessTestBase.TupleRowKeySelector(0), BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);
        keyedTwoInputStreamOperatorTestHarness.open();
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(1L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "ccc"}))));
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(3L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(2L);
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "ccc"}))));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(6L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(3L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "ddd"}))));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(8L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "ddd"}))));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(10L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(4L);
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "aaa"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "ddd"}))));
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(10L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(5L);
        Assert.assertEquals(2L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(10L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(6L);
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(5L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(7L);
        Assert.assertEquals(1L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(5L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.setProcessingTime(8L);
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numProcessingTimeTimers());
        Assert.assertEquals(0L, keyedTwoInputStreamOperatorTestHarness.numKeyedStateEntries());
        keyedTwoInputStreamOperatorTestHarness.processElement1(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb"}))));
        keyedTwoInputStreamOperatorTestHarness.processElement2(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "bbb"}))));
        Queue<Object> output = keyedTwoInputStreamOperatorTestHarness.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "ccc"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "ccc"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.int2Integer(2), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.int2Integer(2), "ccc"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "ddd", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "ddd"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", Predef$.MODULE$.int2Integer(1), "ddd"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(1), "aaa"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.int2Integer(2), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.int2Integer(2), "ccc"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "ccc"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", Predef$.MODULE$.int2Integer(1), "ddd"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(false, Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", Predef$.MODULE$.int2Integer(1), "ddd"}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "ccc", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.int2Integer(1), "bbb", null, null}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{null, null, Predef$.MODULE$.int2Integer(2), "bbb"}))));
        verify(concurrentLinkedQueue, output);
        keyedTwoInputStreamOperatorTestHarness.close();
    }

    public JoinHarnessTest() {
        config().setIdleStateRetentionTime(Time.milliseconds(2L), Time.milliseconds(4L));
        this.rowType = Types.ROW(new TypeInformation[]{Types.LONG(), Types.STRING()});
        this.funcCode = new StringOps(Predef$.MODULE$.augmentString("\n      |public class TestJoinFunction\n      |          extends org.apache.flink.api.common.functions.RichFlatJoinFunction {\n      |  transient org.apache.flink.types.Row out =\n      |            new org.apache.flink.types.Row(4);\n      |  public TestJoinFunction() throws Exception {}\n      |\n      |  @Override\n      |  public void open(org.apache.flink.configuration.Configuration parameters)\n      |  throws Exception {}\n      |\n      |  @Override\n      |  public void join(Object _in1, Object _in2, org.apache.flink.util.Collector c)\n      |   throws Exception {\n      |   org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) _in1;\n      |   org.apache.flink.types.Row in2 = (org.apache.flink.types.Row) _in2;\n      |\n      |   out.setField(0, in1.getField(0));\n      |   out.setField(1, in1.getField(1));\n      |   out.setField(2, in2.getField(0));\n      |   out.setField(3, in2.getField(1));\n      |\n      |   c.collect(out);\n      |\n      |  }\n      |\n      |  @Override\n      |  public void close() throws Exception {}\n      |}\n    ")).stripMargin();
        this.funcCodeWithNonEqualPred = new StringOps(Predef$.MODULE$.augmentString("\n      |public class TestJoinFunction\n      |          extends org.apache.flink.api.common.functions.RichFlatJoinFunction {\n      |  transient org.apache.flink.types.Row out =\n      |            new org.apache.flink.types.Row(4);\n      |  public TestJoinFunction() throws Exception {}\n      |\n      |  @Override\n      |  public void open(org.apache.flink.configuration.Configuration parameters)\n      |  throws Exception {}\n      |\n      |  @Override\n      |  public void join(Object _in1, Object _in2, org.apache.flink.util.Collector c)\n      |   throws Exception {\n      |   org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) _in1;\n      |   org.apache.flink.types.Row in2 = (org.apache.flink.types.Row) _in2;\n      |\n      |   out.setField(0, in1.getField(0));\n      |   out.setField(1, in1.getField(1));\n      |   out.setField(2, in2.getField(0));\n      |   out.setField(3, in2.getField(1));\n      |   if(((java.lang.String)in1.getField(1)).compareTo((java.lang.String)in2.getField(1))>0) {\n      |      c.collect(out);\n      |   }\n      |  }\n      |\n      |  @Override\n      |  public void close() throws Exception {}\n      |}\n    ")).stripMargin();
        this.funcCodeWithNonEqualPred2 = new StringOps(Predef$.MODULE$.augmentString("\n      |public class TestJoinFunction\n      |          extends org.apache.flink.api.common.functions.RichFlatJoinFunction {\n      |  transient org.apache.flink.types.Row out =\n      |            new org.apache.flink.types.Row(4);\n      |  public TestJoinFunction() throws Exception {}\n      |\n      |  @Override\n      |  public void open(org.apache.flink.configuration.Configuration parameters)\n      |  throws Exception {}\n      |\n      |  @Override\n      |  public void join(Object _in1, Object _in2, org.apache.flink.util.Collector c)\n      |   throws Exception {\n      |   org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) _in1;\n      |   org.apache.flink.types.Row in2 = (org.apache.flink.types.Row) _in2;\n      |\n      |   out.setField(0, in1.getField(0));\n      |   out.setField(1, in1.getField(1));\n      |   out.setField(2, in2.getField(0));\n      |   out.setField(3, in2.getField(1));\n      |   if(((java.lang.String)in1.getField(1)).compareTo((java.lang.String)in2.getField(1))<0) {\n      |      c.collect(out);\n      |   }\n      |  }\n      |\n      |  @Override\n      |  public void close() throws Exception {}\n      |}\n    ")).stripMargin();
    }
}
