/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.harness;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.StreamQueryConfig;
import org.apache.flink.table.api.Types$;
import org.apache.flink.table.runtime.aggregate.ProcTimeBoundedRangeOver;
import org.apache.flink.table.runtime.aggregate.ProcTimeBoundedRowsOver;
import org.apache.flink.table.runtime.aggregate.ProcTimeUnboundedOver;
import org.apache.flink.table.runtime.aggregate.RowTimeBoundedRangeOver;
import org.apache.flink.table.runtime.aggregate.RowTimeBoundedRowsOver;
import org.apache.flink.table.runtime.aggregate.RowTimeUnboundedRangeOver;
import org.apache.flink.table.runtime.aggregate.RowTimeUnboundedRowsOver;
import org.apache.flink.table.runtime.harness.HarnessTestBase;
import org.apache.flink.table.runtime.types.CRow$;
import org.junit.Test;
import scala.Predef$;
import scala.collection.Seq;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001)3A!\u0001\u0002\u0001\u001f\t)rJ^3s/&tGm\\<ICJtWm]:UKN$(BA\u0002\u0005\u0003\u001dA\u0017M\u001d8fgNT!!\u0002\u0004\u0002\u000fI,h\u000e^5nK*\u0011q\u0001C\u0001\u0006i\u0006\u0014G.\u001a\u0006\u0003\u0013)\tQA\u001a7j].T!a\u0003\u0007\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005i\u0011aA8sO\u000e\u00011C\u0001\u0001\u0011!\t\t\"#D\u0001\u0003\u0013\t\u0019\"AA\bICJtWm]:UKN$()Y:f\u0011\u0015)\u0002\u0001\"\u0001\u0017\u0003\u0019a\u0014N\\5u}Q\tq\u0003\u0005\u0002\u0012\u0001!9\u0011\u0004\u0001a\u0001\n#Q\u0012aC9vKJL8i\u001c8gS\u001e,\u0012a\u0007\t\u00039}i\u0011!\b\u0006\u0003=\u0019\t1!\u00199j\u0013\t\u0001SDA\tTiJ,\u0017-\\)vKJL8i\u001c8gS\u001eDqA\t\u0001A\u0002\u0013E1%A\brk\u0016\u0014\u0018pQ8oM&<w\fJ3r)\t!#\u0006\u0005\u0002&Q5\taEC\u0001(\u0003\u0015\u00198-\u00197b\u0013\tIcE\u0001\u0003V]&$\bbB\u0016\"\u0003\u0003\u0005\raG\u0001\u0004q\u0012\n\u0004BB\u0017\u0001A\u0003&1$\u0001\u0007rk\u0016\u0014\u0018pQ8oM&<\u0007\u0005C\u00030\u0001\u0011\u0005\u0001'A\u000euKN$\bK]8d)&lWMQ8v]\u0012,GMU8xg>3XM\u001d\u000b\u0002I!\u0012aF\r\t\u0003gYj\u0011\u0001\u000e\u0006\u0003k1\tQA[;oSRL!a\u000e\u001b\u0003\tQ+7\u000f\u001e\u0005\u0006s\u0001!\t\u0001M\u0001\u001di\u0016\u001cH\u000f\u0015:pGRKW.\u001a\"pk:$W\r\u001a*b]\u001e,wJ^3sQ\tA$\u0007C\u0003=\u0001\u0011\u0005\u0001'A\ruKN$\bK]8d)&lW-\u00168c_VtG-\u001a3Pm\u0016\u0014\bFA\u001e3\u0011\u0015y\u0004\u0001\"\u00011\u0003m!Xm\u001d;S_^$\u0016.\\3C_VtG-\u001a3SC:<Wm\u0014<fe\"\u0012aH\r\u0005\u0006\u0005\u0002!\t\u0001M\u0001\u001bi\u0016\u001cHOU8x)&lWMQ8v]\u0012,GMU8xg>3XM\u001d\u0015\u0003\u0003JBQ!\u0012\u0001\u0005\u0002A\nQ\u0004^3tiJ{w\u000fV5nKVs'm\\;oI\u0016$'+\u00198hK>3XM\u001d\u0015\u0003\tJBQ\u0001\u0013\u0001\u0005\u0002A\nA\u0004^3tiJ{w\u000fV5nKVs'm\\;oI\u0016$'k\\<t\u001fZ,'\u000f\u000b\u0002He\u0001")
public class OverWindowHarnessTest
extends HarnessTestBase {
    private StreamQueryConfig queryConfig = new HarnessTestBase.TestStreamQueryConfig(Time.seconds((long)2L), Time.seconds((long)3L));

    public StreamQueryConfig queryConfig() {
        return this.queryConfig;
    }

    public void queryConfig_$eq(StreamQueryConfig x$1) {
        this.queryConfig = x$1;
    }

    @Test
    public void testProcTimeBoundedRowsOver() {
        LegacyKeyedProcessOperator processFunction = new LegacyKeyedProcessOperator((ProcessFunction)new ProcTimeBoundedRowsOver(this.genMinMaxAggFunction(), 2L, this.minMaxAggregationStateType(), (TypeInformation)this.minMaxCRowType(), this.queryConfig()));
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createHarnessTester(processFunction, new HarnessTestBase.TupleRowKeySelector(1), Types$.MODULE$.STRING());
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(1L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "bbb", Predef$.MODULE$.long2Long(10L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(2L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(3L)}))));
        testHarness.setProcessingTime(1100L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "bbb", Predef$.MODULE$.long2Long(20L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(4L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(5L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(6L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "bbb", Predef$.MODULE$.long2Long(30L)}))));
        testHarness.setProcessingTime(3001L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(7L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(8L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(9L)}))));
        testHarness.setProcessingTime(6002L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(10L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "bbb", Predef$.MODULE$.long2Long(40L)}))));
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "bbb", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(3L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "bbb", Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(20L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(4L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(5L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(6L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(1L), "bbb", Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(30L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(7L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(8L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(9L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "bbb", Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L)}))));
        this.verify(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testProcTimeBoundedRangeOver() {
        LegacyKeyedProcessOperator processFunction = new LegacyKeyedProcessOperator((ProcessFunction)new ProcTimeBoundedRangeOver(this.genMinMaxAggFunction(), 4000L, this.minMaxAggregationStateType(), (TypeInformation)this.minMaxCRowType(), this.queryConfig()));
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createHarnessTester(processFunction, new HarnessTestBase.TupleRowKeySelector(1), Types$.MODULE$.STRING());
        testHarness.open();
        testHarness.setProcessingTime(3L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(1L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(10L)}))));
        testHarness.setProcessingTime(4L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(2L)}))));
        testHarness.setProcessingTime(3003L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(3L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(20L)}))));
        testHarness.setProcessingTime(5L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(4L)}))));
        testHarness.setProcessingTime(6002L);
        testHarness.setProcessingTime(7002L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(5L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(6L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(30L)}))));
        testHarness.setProcessingTime(11002L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(7L)}))));
        testHarness.setProcessingTime(11004L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(8L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(9L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(10L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(40L)}))));
        testHarness.setProcessingTime(11006L);
        testHarness.setProcessingTime(20000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "ccc", Predef$.MODULE$.long2Long(10L)}))));
        testHarness.setProcessingTime(22500L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "ccc", Predef$.MODULE$.long2Long(20L)}))));
        testHarness.setProcessingTime(23001L);
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(4L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(20L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(4L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(6L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(6L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(30L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(7L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "ccc", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "ccc", Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(20L)}))));
        this.verify(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testProcTimeUnboundedOver() {
        LegacyKeyedProcessOperator processFunction = new LegacyKeyedProcessOperator((ProcessFunction)new ProcTimeUnboundedOver(this.genMinMaxAggFunction(), this.minMaxAggregationStateType(), this.queryConfig()));
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createHarnessTester(processFunction, new HarnessTestBase.TupleRowKeySelector(1), Types$.MODULE$.STRING());
        testHarness.open();
        testHarness.setProcessingTime(1003L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(1L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(10L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(2L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(3L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(20L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(4L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(5L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(6L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(30L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(7L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(8L)}))));
        testHarness.setProcessingTime(5003L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(9L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(10L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(40L)}))));
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(3L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(20L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(4L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(5L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(6L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(30L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(7L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(8L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(9L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L)}))));
        this.verify(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testRowTimeBoundedRangeOver() {
        LegacyKeyedProcessOperator processFunction = new LegacyKeyedProcessOperator((ProcessFunction)new RowTimeBoundedRangeOver(this.genMinMaxAggFunction(), this.minMaxAggregationStateType(), this.minMaxCRowType(), 4000L, 0, (StreamQueryConfig)new HarnessTestBase.TestStreamQueryConfig(Time.seconds((long)1L), Time.seconds((long)2L))));
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createHarnessTester(processFunction, new HarnessTestBase.TupleRowKeySelector(1), BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.open();
        testHarness.processWatermark(1L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(1L)}))));
        testHarness.processWatermark(2L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(3L), "bbb", Predef$.MODULE$.long2Long(10L)}))));
        testHarness.processWatermark(4000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L)}))));
        testHarness.processWatermark(4001L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4002L), "aaa", Predef$.MODULE$.long2Long(3L)}))));
        testHarness.processWatermark(4002L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4003L), "aaa", Predef$.MODULE$.long2Long(4L)}))));
        testHarness.processWatermark(4800L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4801L), "bbb", Predef$.MODULE$.long2Long(25L)}))));
        testHarness.processWatermark(6500L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L)}))));
        testHarness.processWatermark(7000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L)}))));
        testHarness.processWatermark(8000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L)}))));
        testHarness.processWatermark(12000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L)}))));
        testHarness.processWatermark(19000L);
        testHarness.setProcessingTime(1000L);
        testHarness.processWatermark(20000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L)}))));
        testHarness.setProcessingTime(2500L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L)}))));
        testHarness.processWatermark(20010L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(4499L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(4500L);
        int x = testHarness.numKeyedStateEntries();
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() == 0);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20011L), "ccc", Predef$.MODULE$.long2Long(3L)}))));
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(6500L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.processWatermark(20020L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(8499L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(8500L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() == 0);
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(3L), "bbb", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4002L), "aaa", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(3L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4003L), "aaa", Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(4L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4801L), "bbb", Predef$.MODULE$.long2Long(25L), Predef$.MODULE$.long2Long(25L), Predef$.MODULE$.long2Long(25L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(6L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(6L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(7L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(8L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(25L), Predef$.MODULE$.long2Long(30L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20011L), "ccc", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(3L)}))));
        this.verify(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testRowTimeBoundedRowsOver() {
        LegacyKeyedProcessOperator processFunction = new LegacyKeyedProcessOperator((ProcessFunction)new RowTimeBoundedRowsOver(this.genMinMaxAggFunction(), this.minMaxAggregationStateType(), this.minMaxCRowType(), 3L, 0, (StreamQueryConfig)new HarnessTestBase.TestStreamQueryConfig(Time.seconds((long)1L), Time.seconds((long)2L))));
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createHarnessTester(processFunction, new HarnessTestBase.TupleRowKeySelector(1), BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.open();
        testHarness.processWatermark(800L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(801L), "aaa", Predef$.MODULE$.long2Long(1L)}))));
        testHarness.processWatermark(2500L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2501L), "bbb", Predef$.MODULE$.long2Long(10L)}))));
        testHarness.processWatermark(4000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(3L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "bbb", Predef$.MODULE$.long2Long(20L)}))));
        testHarness.processWatermark(4800L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4801L), "aaa", Predef$.MODULE$.long2Long(4L)}))));
        testHarness.processWatermark(6500L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L)}))));
        testHarness.processWatermark(7000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L)}))));
        testHarness.processWatermark(8000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L)}))));
        testHarness.processWatermark(12000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L)}))));
        testHarness.processWatermark(19000L);
        testHarness.setProcessingTime(1000L);
        testHarness.processWatermark(20000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L)}))));
        testHarness.setProcessingTime(2500L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L)}))));
        testHarness.processWatermark(20010L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(4499L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(4500L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() == 0);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20011L), "ccc", Predef$.MODULE$.long2Long(3L)}))));
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(6500L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.processWatermark(20020L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(8499L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(8500L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() == 0);
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(801L), "aaa", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2501L), "bbb", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(3L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "bbb", Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(20L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4801L), "aaa", Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(4L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(5L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(6L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(30L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(7L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(8L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(9L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(40L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20011L), "ccc", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(3L)}))));
        this.verify(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testRowTimeUnboundedRangeOver() {
        LegacyKeyedProcessOperator processFunction = new LegacyKeyedProcessOperator((ProcessFunction)new RowTimeUnboundedRangeOver(this.genMinMaxAggFunction(), (TypeInformation)this.minMaxAggregationStateType(), (TypeInformation)this.minMaxCRowType(), 0, (StreamQueryConfig)new HarnessTestBase.TestStreamQueryConfig(Time.seconds((long)1L), Time.seconds((long)2L))));
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createHarnessTester(processFunction, new HarnessTestBase.TupleRowKeySelector(1), BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.open();
        testHarness.setProcessingTime(1000L);
        testHarness.processWatermark(800L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(801L), "aaa", Predef$.MODULE$.long2Long(1L)}))));
        testHarness.processWatermark(2500L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2501L), "bbb", Predef$.MODULE$.long2Long(10L)}))));
        testHarness.processWatermark(4000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(3L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "bbb", Predef$.MODULE$.long2Long(20L)}))));
        testHarness.processWatermark(4800L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4801L), "aaa", Predef$.MODULE$.long2Long(4L)}))));
        testHarness.processWatermark(6500L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L)}))));
        testHarness.processWatermark(7000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L)}))));
        testHarness.processWatermark(8000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L)}))));
        testHarness.processWatermark(12000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L)}))));
        testHarness.processWatermark(19000L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(2999L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(3000L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() == 0);
        testHarness.processWatermark(20000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20000L), "ccc", Predef$.MODULE$.long2Long(1L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L)}))));
        testHarness.setProcessingTime(2500L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L)}))));
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(5000L);
        testHarness.processWatermark(20010L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(6999L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(7000L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() == 0);
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(801L), "aaa", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2501L), "bbb", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(3L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(3L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "bbb", Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(20L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4801L), "aaa", Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(4L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(6L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(6L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(30L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(7L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(8L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(40L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        this.verify(expectedOutput, result);
        testHarness.close();
    }

    @Test
    public void testRowTimeUnboundedRowsOver() {
        LegacyKeyedProcessOperator processFunction = new LegacyKeyedProcessOperator((ProcessFunction)new RowTimeUnboundedRowsOver(this.genMinMaxAggFunction(), (TypeInformation)this.minMaxAggregationStateType(), (TypeInformation)this.minMaxCRowType(), 0, (StreamQueryConfig)new HarnessTestBase.TestStreamQueryConfig(Time.seconds((long)1L), Time.seconds((long)2L))));
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createHarnessTester(processFunction, new HarnessTestBase.TupleRowKeySelector(1), BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.open();
        testHarness.setProcessingTime(1000L);
        testHarness.processWatermark(800L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(801L), "aaa", Predef$.MODULE$.long2Long(1L)}))));
        testHarness.processWatermark(2500L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2501L), "bbb", Predef$.MODULE$.long2Long(10L)}))));
        testHarness.processWatermark(4000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(3L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "bbb", Predef$.MODULE$.long2Long(20L)}))));
        testHarness.processWatermark(4800L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4801L), "aaa", Predef$.MODULE$.long2Long(4L)}))));
        testHarness.processWatermark(6500L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L)}))));
        testHarness.processWatermark(7000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L)}))));
        testHarness.processWatermark(8000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L)}))));
        testHarness.processWatermark(12000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L)}))));
        testHarness.processWatermark(19000L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(2999L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(3000L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() == 0);
        testHarness.processWatermark(20000L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20000L), "ccc", Predef$.MODULE$.long2Long(2L)}))));
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L)}))));
        testHarness.setProcessingTime(2500L);
        testHarness.processElement(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L)}))));
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(5000L);
        testHarness.processWatermark(20010L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(6999L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() > 0);
        testHarness.setProcessingTime(7000L);
        Predef$.MODULE$.assert(testHarness.numKeyedStateEntries() == 0);
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(801L), "aaa", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(2501L), "bbb", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(3L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4001L), "bbb", Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(20L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(4801L), "aaa", Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(4L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(5L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(6L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(30L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(7L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(8L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(9L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(10L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(40L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        expectedOutput.add(new StreamRecord((Object)CRow$.MODULE$.apply((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        this.verify(expectedOutput, result);
        testHarness.close();
    }
}

