package org.apache.flink.table.runtime.harness;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.runtime.aggregate.ProcTimeBoundedRangeOver;
import org.apache.flink.table.runtime.aggregate.ProcTimeBoundedRowsOver;
import org.apache.flink.table.runtime.aggregate.ProcTimeUnboundedOver;
import org.apache.flink.table.runtime.aggregate.RowTimeBoundedRangeOver;
import org.apache.flink.table.runtime.aggregate.RowTimeBoundedRowsOver;
import org.apache.flink.table.runtime.aggregate.RowTimeUnboundedRangeOver;
import org.apache.flink.table.runtime.aggregate.RowTimeUnboundedRowsOver;
import org.apache.flink.table.runtime.harness.HarnessTestBase;
import org.apache.flink.table.runtime.types.CRow$;
import org.junit.Test;
import scala.Predef$;
import scala.reflect.ScalaSignature;

/* compiled from: OverWindowHarnessTest.scala */
@ScalaSignature(bytes = "\u0006\u0001i3A!\u0001\u0002\u0001\u001f\t)rJ^3s/&tGm\\<ICJtWm]:UKN$(BA\u0002\u0005\u0003\u001dA\u0017M\u001d8fgNT!!\u0002\u0004\u0002\u000fI,h\u000e^5nK*\u0011q\u0001C\u0001\u0006i\u0006\u0014G.\u001a\u0006\u0003\u0013)\tQA\u001a7j].T!a\u0003\u0007\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005i\u0011aA8sO\u000e\u00011C\u0001\u0001\u0011!\t\t\"#D\u0001\u0003\u0013\t\u0019\"AA\bICJtWm]:UKN$()Y:f\u0011\u0015)\u0002\u0001\"\u0001\u0017\u0003\u0019a\u0014N\\5u}Q\tq\u0003\u0005\u0002\u0012\u0001!9\u0011\u0004\u0001a\u0001\n#Q\u0012AB2p]\u001aLw-F\u0001\u001c!\tabF\u0004\u0002\u001eY9\u0011ad\u000b\b\u0003?)r!\u0001I\u0015\u000f\u0005\u0005BcB\u0001\u0012(\u001d\t\u0019c%D\u0001%\u0015\t)c\"\u0001\u0004=e>|GOP\u0005\u0002\u001b%\u00111\u0002D\u0005\u0003\u0013)I!a\u0002\u0005\n\u0005\u00151\u0011BA\u0002\u0005\u0013\ti#!A\bICJtWm]:UKN$()Y:f\u0013\ty\u0003GA\bUKN$H+\u00192mK\u000e{gNZ5h\u0015\ti#\u0001C\u00043\u0001\u0001\u0007I\u0011C\u001a\u0002\u0015\r|gNZ5h?\u0012*\u0017\u000f\u0006\u00025uA\u0011Q\u0007O\u0007\u0002m)\tq'A\u0003tG\u0006d\u0017-\u0003\u0002:m\t!QK\\5u\u0011\u001dY\u0014'!AA\u0002m\t1\u0001\u001f\u00132\u0011\u0019i\u0004\u0001)Q\u00057\u000591m\u001c8gS\u001e\u0004\u0003\"B \u0001\t\u0003\u0001\u0015a\u0007;fgR\u0004&o\\2US6,'i\\;oI\u0016$'k\\<t\u001fZ,'\u000fF\u00015Q\tq$\t\u0005\u0002D\r6\tAI\u0003\u0002F\u0019\u0005)!.\u001e8ji&\u0011q\t\u0012\u0002\u0005)\u0016\u001cH\u000fC\u0003J\u0001\u0011\u0005\u0001)\u0001\u000fuKN$\bK]8d)&lWMQ8v]\u0012,GMU1oO\u0016|e/\u001a:)\u0005!\u0013\u0005\"\u0002'\u0001\t\u0003\u0001\u0015!\u0007;fgR\u0004&o\\2US6,WK\u001c2pk:$W\rZ(wKJD#a\u0013\"\t\u000b=\u0003A\u0011\u0001!\u00027Q,7\u000f\u001e*poRKW.\u001a\"pk:$W\r\u001a*b]\u001e,wJ^3sQ\tq%\tC\u0003S\u0001\u0011\u0005\u0001)\u0001\u000euKN$(k\\<US6,'i\\;oI\u0016$'k\\<t\u001fZ,'\u000f\u000b\u0002R\u0005\")Q\u000b\u0001C\u0001\u0001\u0006iB/Z:u%><H+[7f+:\u0014w.\u001e8eK\u0012\u0014\u0016M\\4f\u001fZ,'\u000f\u000b\u0002U\u0005\")\u0001\f\u0001C\u0001\u0001\u0006aB/Z:u%><H+[7f+:\u0014w.\u001e8eK\u0012\u0014vn^:Pm\u0016\u0014\bFA,C\u0001")
/* loaded from: input_file:org/apache/flink/table/runtime/harness/OverWindowHarnessTest.class */
public class OverWindowHarnessTest extends HarnessTestBase {
    private HarnessTestBase.TestTableConfig config = new HarnessTestBase.TestTableConfig();

    public HarnessTestBase.TestTableConfig config() {
        return this.config;
    }

    public void config_$eq(HarnessTestBase.TestTableConfig testTableConfig) {
        this.config = testTableConfig;
    }

    @Test
    public void testProcTimeBoundedRowsOver() {
        KeyedOneInputStreamOperatorTestHarness createHarnessTester = createHarnessTester(new KeyedProcessOperator(new ProcTimeBoundedRowsOver(genMinMaxAggFunction(), 2L, minMaxAggregationStateType(), minMaxCRowType(), config().getMinIdleStateRetentionTime(), config().getMaxIdleStateRetentionTime())), new HarnessTestBase.TupleRowKeySelector(1), Types.STRING());
        createHarnessTester.open();
        createHarnessTester.setProcessingTime(1L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(1L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "bbb", Predef$.MODULE$.long2Long(10L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(2L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(3L)}))));
        createHarnessTester.setProcessingTime(1100L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "bbb", Predef$.MODULE$.long2Long(20L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(4L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(5L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(6L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "bbb", Predef$.MODULE$.long2Long(30L)}))));
        createHarnessTester.setProcessingTime(3001L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(7L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(8L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(9L)}))));
        createHarnessTester.setProcessingTime(6002L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(10L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "bbb", Predef$.MODULE$.long2Long(40L)}))));
        Queue<Object> output = createHarnessTester.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "bbb", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(3L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "bbb", Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(20L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(4L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(5L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "aaa", Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(6L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(1L), "bbb", Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(30L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(7L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(8L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(9L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "bbb", Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L)}))));
        verify(concurrentLinkedQueue, output);
        createHarnessTester.close();
    }

    @Test
    public void testProcTimeBoundedRangeOver() {
        KeyedOneInputStreamOperatorTestHarness createHarnessTester = createHarnessTester(new KeyedProcessOperator(new ProcTimeBoundedRangeOver(genMinMaxAggFunction(), 4000L, minMaxAggregationStateType(), minMaxCRowType(), config().getMinIdleStateRetentionTime(), config().getMaxIdleStateRetentionTime())), new HarnessTestBase.TupleRowKeySelector(1), Types.STRING());
        createHarnessTester.open();
        createHarnessTester.setProcessingTime(3L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(1L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(10L)}))));
        createHarnessTester.setProcessingTime(4L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(2L)}))));
        createHarnessTester.setProcessingTime(3003L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(3L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(20L)}))));
        createHarnessTester.setProcessingTime(5L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(4L)}))));
        createHarnessTester.setProcessingTime(6002L);
        createHarnessTester.setProcessingTime(7002L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(5L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(6L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(30L)}))));
        createHarnessTester.setProcessingTime(11002L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(7L)}))));
        createHarnessTester.setProcessingTime(11004L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(8L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(9L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(10L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(40L)}))));
        createHarnessTester.setProcessingTime(11006L);
        createHarnessTester.setProcessingTime(20000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "ccc", Predef$.MODULE$.long2Long(10L)}))));
        createHarnessTester.setProcessingTime(22500L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "ccc", Predef$.MODULE$.long2Long(20L)}))));
        createHarnessTester.setProcessingTime(23001L);
        Queue<Object> output = createHarnessTester.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(4L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(20L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(4L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(6L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(6L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(30L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(7L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "ccc", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "ccc", Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(20L)}))));
        verify(concurrentLinkedQueue, output);
        createHarnessTester.close();
    }

    @Test
    public void testProcTimeUnboundedOver() {
        KeyedOneInputStreamOperatorTestHarness createHarnessTester = createHarnessTester(new KeyedProcessOperator(new ProcTimeUnboundedOver(genMinMaxAggFunction(), minMaxAggregationStateType(), config().getMinIdleStateRetentionTime(), config().getMaxIdleStateRetentionTime())), new HarnessTestBase.TupleRowKeySelector(1), Types.STRING());
        createHarnessTester.open();
        createHarnessTester.setProcessingTime(1003L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(1L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(10L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(2L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(3L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(20L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(4L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(5L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(6L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(30L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(7L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(8L)}))));
        createHarnessTester.setProcessingTime(5003L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(9L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(10L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(40L)}))));
        Queue<Object> output = createHarnessTester.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(3L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(20L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(4L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(5L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(6L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(30L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(7L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(8L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(9L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "aaa", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(0L), "bbb", Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L)}))));
        verify(concurrentLinkedQueue, output);
        createHarnessTester.close();
    }

    @Test
    public void testRowTimeBoundedRangeOver() {
        config().setIdleStateRetentionTime(Time.seconds(1L), Time.seconds(2L));
        KeyedOneInputStreamOperatorTestHarness createHarnessTester = createHarnessTester(new KeyedProcessOperator(new RowTimeBoundedRangeOver(genMinMaxAggFunction(), minMaxAggregationStateType(), minMaxCRowType(), 4000L, 0, config().getMinIdleStateRetentionTime(), config().getMaxIdleStateRetentionTime())), new HarnessTestBase.TupleRowKeySelector(1), BasicTypeInfo.STRING_TYPE_INFO);
        createHarnessTester.open();
        createHarnessTester.processWatermark(1L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(1L)}))));
        createHarnessTester.processWatermark(2L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(3L), "bbb", Predef$.MODULE$.long2Long(10L)}))));
        createHarnessTester.processWatermark(4000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L)}))));
        createHarnessTester.processWatermark(4001L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4002L), "aaa", Predef$.MODULE$.long2Long(3L)}))));
        createHarnessTester.processWatermark(4002L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4003L), "aaa", Predef$.MODULE$.long2Long(4L)}))));
        createHarnessTester.processWatermark(4800L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4801L), "bbb", Predef$.MODULE$.long2Long(25L)}))));
        createHarnessTester.processWatermark(6500L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L)}))));
        createHarnessTester.processWatermark(7000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L)}))));
        createHarnessTester.processWatermark(8000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L)}))));
        createHarnessTester.processWatermark(12000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L)}))));
        createHarnessTester.processWatermark(19000L);
        createHarnessTester.setProcessingTime(1000L);
        createHarnessTester.processWatermark(20000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L)}))));
        createHarnessTester.setProcessingTime(2500L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L)}))));
        createHarnessTester.processWatermark(20010L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(4499L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(4500L);
        createHarnessTester.numKeyedStateEntries();
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() == 0);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20011L), "ccc", Predef$.MODULE$.long2Long(3L)}))));
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(6500L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.processWatermark(20020L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(8499L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(8500L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() == 0);
        Queue<Object> output = createHarnessTester.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2L), "aaa", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(3L), "bbb", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4002L), "aaa", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(3L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4003L), "aaa", Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(4L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4801L), "bbb", Predef$.MODULE$.long2Long(25L), Predef$.MODULE$.long2Long(25L), Predef$.MODULE$.long2Long(25L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(6L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(6L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(7L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(8L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(25L), Predef$.MODULE$.long2Long(30L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(40L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20011L), "ccc", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(3L)}))));
        verify(concurrentLinkedQueue, output);
        createHarnessTester.close();
    }

    @Test
    public void testRowTimeBoundedRowsOver() {
        config().setIdleStateRetentionTime(Time.seconds(1L), Time.seconds(2L));
        KeyedOneInputStreamOperatorTestHarness createHarnessTester = createHarnessTester(new KeyedProcessOperator(new RowTimeBoundedRowsOver(genMinMaxAggFunction(), minMaxAggregationStateType(), minMaxCRowType(), 3L, 0, config().getMinIdleStateRetentionTime(), config().getMaxIdleStateRetentionTime())), new HarnessTestBase.TupleRowKeySelector(1), BasicTypeInfo.STRING_TYPE_INFO);
        createHarnessTester.open();
        createHarnessTester.processWatermark(800L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(801L), "aaa", Predef$.MODULE$.long2Long(1L)}))));
        createHarnessTester.processWatermark(2500L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2501L), "bbb", Predef$.MODULE$.long2Long(10L)}))));
        createHarnessTester.processWatermark(4000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(3L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "bbb", Predef$.MODULE$.long2Long(20L)}))));
        createHarnessTester.processWatermark(4800L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4801L), "aaa", Predef$.MODULE$.long2Long(4L)}))));
        createHarnessTester.processWatermark(6500L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L)}))));
        createHarnessTester.processWatermark(7000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L)}))));
        createHarnessTester.processWatermark(8000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L)}))));
        createHarnessTester.processWatermark(12000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L)}))));
        createHarnessTester.processWatermark(19000L);
        createHarnessTester.setProcessingTime(1000L);
        createHarnessTester.processWatermark(20000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L)}))));
        createHarnessTester.setProcessingTime(2500L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L)}))));
        createHarnessTester.processWatermark(20010L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(4499L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(4500L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() == 0);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20011L), "ccc", Predef$.MODULE$.long2Long(3L)}))));
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(6500L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.processWatermark(20020L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(8499L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(8500L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() == 0);
        Queue<Object> output = createHarnessTester.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(801L), "aaa", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2501L), "bbb", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(3L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "bbb", Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(20L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4801L), "aaa", Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(4L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(5L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(6L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(30L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(7L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(8L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(9L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(40L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20011L), "ccc", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(3L)}))));
        verify(concurrentLinkedQueue, output);
        createHarnessTester.close();
    }

    @Test
    public void testRowTimeUnboundedRangeOver() {
        config().setIdleStateRetentionTime(Time.seconds(1L), Time.seconds(2L));
        KeyedOneInputStreamOperatorTestHarness createHarnessTester = createHarnessTester(new KeyedProcessOperator(new RowTimeUnboundedRangeOver(genMinMaxAggFunction(), minMaxAggregationStateType(), minMaxCRowType(), 0, config().getMinIdleStateRetentionTime(), config().getMaxIdleStateRetentionTime())), new HarnessTestBase.TupleRowKeySelector(1), BasicTypeInfo.STRING_TYPE_INFO);
        createHarnessTester.open();
        createHarnessTester.setProcessingTime(1000L);
        createHarnessTester.processWatermark(800L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(801L), "aaa", Predef$.MODULE$.long2Long(1L)}))));
        createHarnessTester.processWatermark(2500L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2501L), "bbb", Predef$.MODULE$.long2Long(10L)}))));
        createHarnessTester.processWatermark(4000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(3L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "bbb", Predef$.MODULE$.long2Long(20L)}))));
        createHarnessTester.processWatermark(4800L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4801L), "aaa", Predef$.MODULE$.long2Long(4L)}))));
        createHarnessTester.processWatermark(6500L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L)}))));
        createHarnessTester.processWatermark(7000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L)}))));
        createHarnessTester.processWatermark(8000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L)}))));
        createHarnessTester.processWatermark(12000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L)}))));
        createHarnessTester.processWatermark(19000L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(2999L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(3000L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() == 0);
        createHarnessTester.processWatermark(20000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20000L), "ccc", Predef$.MODULE$.long2Long(1L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L)}))));
        createHarnessTester.setProcessingTime(2500L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L)}))));
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(5000L);
        createHarnessTester.processWatermark(20010L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(6999L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(7000L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() == 0);
        Queue<Object> output = createHarnessTester.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(801L), "aaa", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2501L), "bbb", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(3L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(3L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "bbb", Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(20L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4801L), "aaa", Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(4L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(6L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(6L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(30L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(7L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(8L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(40L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        verify(concurrentLinkedQueue, output);
        createHarnessTester.close();
    }

    @Test
    public void testRowTimeUnboundedRowsOver() {
        config().setIdleStateRetentionTime(Time.seconds(1L), Time.seconds(2L));
        KeyedOneInputStreamOperatorTestHarness createHarnessTester = createHarnessTester(new KeyedProcessOperator(new RowTimeUnboundedRowsOver(genMinMaxAggFunction(), minMaxAggregationStateType(), minMaxCRowType(), 0, config().getMinIdleStateRetentionTime(), config().getMaxIdleStateRetentionTime())), new HarnessTestBase.TupleRowKeySelector(1), BasicTypeInfo.STRING_TYPE_INFO);
        createHarnessTester.open();
        createHarnessTester.setProcessingTime(1000L);
        createHarnessTester.processWatermark(800L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(801L), "aaa", Predef$.MODULE$.long2Long(1L)}))));
        createHarnessTester.processWatermark(2500L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2501L), "bbb", Predef$.MODULE$.long2Long(10L)}))));
        createHarnessTester.processWatermark(4000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(3L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "bbb", Predef$.MODULE$.long2Long(20L)}))));
        createHarnessTester.processWatermark(4800L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4801L), "aaa", Predef$.MODULE$.long2Long(4L)}))));
        createHarnessTester.processWatermark(6500L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L)}))));
        createHarnessTester.processWatermark(7000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L)}))));
        createHarnessTester.processWatermark(8000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L)}))));
        createHarnessTester.processWatermark(12000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L)}))));
        createHarnessTester.processWatermark(19000L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(2999L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(3000L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() == 0);
        createHarnessTester.processWatermark(20000L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20000L), "ccc", Predef$.MODULE$.long2Long(2L)}))));
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L)}))));
        createHarnessTester.setProcessingTime(2500L);
        createHarnessTester.processElement(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L)}))));
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(5000L);
        createHarnessTester.processWatermark(20010L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(6999L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() > 0);
        createHarnessTester.setProcessingTime(7000L);
        Predef$.MODULE$.assert(createHarnessTester.numKeyedStateEntries() == 0);
        Queue<Object> output = createHarnessTester.getOutput();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(801L), "aaa", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(2501L), "bbb", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "aaa", Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(3L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4001L), "bbb", Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(20L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(4801L), "aaa", Predef$.MODULE$.long2Long(4L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(4L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(5L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(5L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "aaa", Predef$.MODULE$.long2Long(6L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(6L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(6501L), "bbb", Predef$.MODULE$.long2Long(30L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(30L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(7001L), "aaa", Predef$.MODULE$.long2Long(7L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(7L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(8001L), "aaa", Predef$.MODULE$.long2Long(8L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(8L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(9L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(9L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "aaa", Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(10L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(12001L), "bbb", Predef$.MODULE$.long2Long(40L), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.long2Long(40L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20001L), "ccc", Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(1L)}))));
        concurrentLinkedQueue.add(new StreamRecord(CRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{Predef$.MODULE$.long2Long(20002L), "ccc", Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.long2Long(2L)}))));
        verify(concurrentLinkedQueue, output);
        createHarnessTester.close();
    }

    public OverWindowHarnessTest() {
        config().setIdleStateRetentionTime(Time.seconds(2L), Time.seconds(3L));
    }
}
