/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.window;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.dataview.StateDataViewStore;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.operators.window.CountWindow;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.Window;
import org.apache.flink.table.runtime.operators.window.WindowOperator;
import org.apache.flink.table.runtime.operators.window.WindowOperatorBuilder;
import org.apache.flink.table.runtime.operators.window.assigners.SessionWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.TumblingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.operators.window.triggers.ElementTriggers;
import org.apache.flink.table.runtime.operators.window.triggers.EventTimeTriggers;
import org.apache.flink.table.runtime.operators.window.triggers.ProcessingTimeTriggers;
import org.apache.flink.table.runtime.operators.window.triggers.Trigger;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.runtime.util.BaseRowHarnessAssertor;
import org.apache.flink.table.runtime.util.BinaryRowKeySelector;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.Assert;
import org.junit.Test;

public class WindowOperatorTest {
    private static AtomicInteger closeCalled = new AtomicInteger(0);
    private LogicalType[] inputFieldTypes = new LogicalType[]{new VarCharType(Integer.MAX_VALUE), new IntType(), new BigIntType()};
    private BaseRowTypeInfo outputType = new BaseRowTypeInfo(new LogicalType[]{new VarCharType(Integer.MAX_VALUE), new BigIntType(), new BigIntType(), new BigIntType(), new BigIntType(), new BigIntType()});
    private LogicalType[] aggResultTypes = new LogicalType[]{new BigIntType(), new BigIntType()};
    private LogicalType[] accTypes = new LogicalType[]{new BigIntType(), new BigIntType()};
    private LogicalType[] windowTypes = new LogicalType[]{new BigIntType(), new BigIntType(), new BigIntType()};
    private GenericRowEqualiser equaliser = new GenericRowEqualiser(this.accTypes, this.windowTypes);
    private BinaryRowKeySelector keySelector = new BinaryRowKeySelector(new int[]{0}, this.inputFieldTypes);
    private TypeInformation<BaseRow> keyType = this.keySelector.getProducedType();
    private BaseRowHarnessAssertor assertor = new BaseRowHarnessAssertor(this.outputType.getFieldTypes(), new GenericRowRecordSortComparator(0, (LogicalType)new VarCharType(Integer.MAX_VALUE)));

    @Test
    public void testEventTimeSlidingWindows() throws Exception {
        closeCalled.set(0);
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).sliding(Duration.ofSeconds(3L), Duration.ofSeconds(1L)).withEventTime(2).aggregate((NamespaceAggsHandleFunction)new SumAndCountAggTimeWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes).build();
        OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = this.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 3999L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 3000L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 20L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 0L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 999L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 1998L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 1999L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 1000L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(StreamRecordUtils.record("key1", 3L, 3L, -2000L, 1000L, 999L));
        expectedOutput.add(new Watermark(999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(StreamRecordUtils.record("key1", 3L, 3L, -1000L, 2000L, 1999L));
        expectedOutput.add(StreamRecordUtils.record("key2", 3L, 3L, -1000L, 2000L, 1999L));
        expectedOutput.add(new Watermark(1999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(StreamRecordUtils.record("key1", 3L, 3L, 0L, 3000L, 2999L));
        expectedOutput.add(StreamRecordUtils.record("key2", 3L, 3L, 0L, 3000L, 2999L));
        expectedOutput.add(new Watermark(2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(StreamRecordUtils.record("key2", 5L, 5L, 1000L, 4000L, 3999L));
        expectedOutput.add(new Watermark(3999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(StreamRecordUtils.record("key2", 2L, 2L, 2000L, 5000L, 4999L));
        expectedOutput.add(new Watermark(4999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(StreamRecordUtils.record("key2", 2L, 2L, 3000L, 6000L, 5999L));
        expectedOutput.add(new Watermark(5999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(6999L));
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(new Watermark(6999L));
        expectedOutput.add(new Watermark(7999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        Assert.assertEquals((String)"Close was not called.", (long)2L, (long)closeCalled.get());
    }

    @Test
    public void testProcessingTimeSlidingWindows() throws Throwable {
        closeCalled.set(0);
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).sliding(Duration.ofSeconds(3L), Duration.ofSeconds(1L)).withProcessingTime().aggregate((NamespaceAggsHandleFunction)new SumAndCountAggTimeWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes).build();
        OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.setProcessingTime(3L);
        testHarness.processElement(StreamRecordUtils.record("key2", 1, Long.MAX_VALUE));
        testHarness.setProcessingTime(1000L);
        expectedOutput.add(StreamRecordUtils.record("key2", 1L, 1L, -2000L, 1000L, 999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.record("key2", 1, Long.MAX_VALUE));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, Long.MAX_VALUE));
        testHarness.setProcessingTime(2000L);
        expectedOutput.add(StreamRecordUtils.record("key2", 3L, 3L, -1000L, 2000L, 1999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.record("key1", 1, Long.MAX_VALUE));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, Long.MAX_VALUE));
        testHarness.setProcessingTime(3000L);
        expectedOutput.add(StreamRecordUtils.record("key2", 3L, 3L, 0L, 3000L, 2999L));
        expectedOutput.add(StreamRecordUtils.record("key1", 2L, 2L, 0L, 3000L, 2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.record("key1", 1, Long.MAX_VALUE));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, Long.MAX_VALUE));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, Long.MAX_VALUE));
        testHarness.setProcessingTime(7000L);
        expectedOutput.add(StreamRecordUtils.record("key2", 2L, 2L, 1000L, 4000L, 3999L));
        expectedOutput.add(StreamRecordUtils.record("key1", 5L, 5L, 1000L, 4000L, 3999L));
        expectedOutput.add(StreamRecordUtils.record("key1", 5L, 5L, 2000L, 5000L, 4999L));
        expectedOutput.add(StreamRecordUtils.record("key1", 3L, 3L, 3000L, 6000L, 5999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testEventTimeTumblingWindows() throws Exception {
        closeCalled.set(0);
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).tumble(Duration.ofSeconds(3L)).withEventTime(2).aggregate((NamespaceAggsHandleFunction)new SumAndCountAggTimeWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes).build();
        OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 3999L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 3000L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 20L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 0L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 999L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 1998L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 1999L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 1000L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(StreamRecordUtils.record("key1", 3L, 3L, 0L, 3000L, 2999L));
        expectedOutput.add(StreamRecordUtils.record("key2", 3L, 3L, 0L, 3000L, 2999L));
        expectedOutput.add(new Watermark(2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(new Watermark(3999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(StreamRecordUtils.record("key2", 2L, 2L, 3000L, 6000L, 5999L));
        expectedOutput.add(new Watermark(5999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(6999L));
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(new Watermark(6999L));
        expectedOutput.add(new Watermark(7999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        Assert.assertEquals((String)"Close was not called.", (long)2L, (long)closeCalled.get());
    }

    @Test
    public void testEventTimeTumblingWindowsWithEarlyFiring() throws Exception {
        closeCalled.set(0);
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).tumble(Duration.ofSeconds(3L)).withEventTime(2).triggering((Trigger)EventTimeTriggers.afterEndOfWindow().withEarlyFirings((Trigger)ProcessingTimeTriggers.every((Duration)Duration.ofSeconds(1L)))).aggregate((NamespaceAggsHandleFunction)new SumAndCountAggTimeWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes).withSendRetraction().build();
        OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.setProcessingTime(0L);
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 3999L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 3000L));
        testHarness.setProcessingTime(1L);
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 20L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 0L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 999L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 1998L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 1999L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 1000L));
        testHarness.setProcessingTime(1000L);
        expectedOutput.add(StreamRecordUtils.record("key2", 2L, 2L, 3000L, 6000L, 5999L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(1001L);
        expectedOutput.add(StreamRecordUtils.record("key1", 3L, 3L, 0L, 3000L, 2999L));
        expectedOutput.add(StreamRecordUtils.record("key2", 3L, 3L, 0L, 3000L, 2999L));
        testHarness.processWatermark(new Watermark(1999L));
        testHarness.setProcessingTime(2001L);
        expectedOutput.add(new Watermark(1999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.setProcessingTime(3001L);
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(new Watermark(2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 4999L));
        testHarness.processWatermark(new Watermark(3999L));
        testHarness.setProcessingTime(4001L);
        expectedOutput.add(new Watermark(3999L));
        expectedOutput.add(StreamRecordUtils.retractRecord("key2", 2L, 2L, 3000L, 6000L, 5999L));
        expectedOutput.add(StreamRecordUtils.record("key2", 3L, 3L, 3000L, 6000L, 5999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 2001L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 2030L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(5100L);
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 5122L));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(StreamRecordUtils.retractRecord("key2", 3L, 3L, 3000L, 6000L, 5999L));
        expectedOutput.add(StreamRecordUtils.record("key2", 4L, 4L, 3000L, 6000L, 5999L));
        expectedOutput.add(new Watermark(5999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(6001L);
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(6999L));
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(new Watermark(6999L));
        expectedOutput.add(new Watermark(7999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 2877L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 2899L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        Assert.assertEquals((String)"Close was not called.", (long)2L, (long)closeCalled.get());
    }

    @Test
    public void testEventTimeTumblingWindowsWithEarlyAndLateFirings() throws Exception {
        closeCalled.set(0);
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).tumble(Duration.ofSeconds(3L)).withEventTime(2).triggering(EventTimeTriggers.afterEndOfWindow().withEarlyFirings((Trigger)ProcessingTimeTriggers.every((Duration)Duration.ofSeconds(1L))).withLateFirings((Trigger)ElementTriggers.every())).aggregate((NamespaceAggsHandleFunction)new SumAndCountAggTimeWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes).withAllowedLateness(Duration.ofSeconds(3L)).withSendRetraction().build();
        OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.setProcessingTime(0L);
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 3999L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 3000L));
        testHarness.setProcessingTime(1L);
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 20L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 0L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 999L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 1998L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 1999L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 1000L));
        testHarness.setProcessingTime(1000L);
        expectedOutput.add(StreamRecordUtils.record("key2", 2L, 2L, 3000L, 6000L, 5999L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(1001L);
        expectedOutput.add(StreamRecordUtils.record("key1", 3L, 3L, 0L, 3000L, 2999L));
        expectedOutput.add(StreamRecordUtils.record("key2", 3L, 3L, 0L, 3000L, 2999L));
        testHarness.processWatermark(new Watermark(1999L));
        testHarness.setProcessingTime(2001L);
        expectedOutput.add(new Watermark(1999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.setProcessingTime(3001L);
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(new Watermark(2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 4999L));
        testHarness.processWatermark(new Watermark(3999L));
        testHarness.setProcessingTime(4001L);
        expectedOutput.add(new Watermark(3999L));
        expectedOutput.add(StreamRecordUtils.retractRecord("key2", 2L, 2L, 3000L, 6000L, 5999L));
        expectedOutput.add(StreamRecordUtils.record("key2", 3L, 3L, 3000L, 6000L, 5999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 2001L));
        expectedOutput.add(StreamRecordUtils.retractRecord("key2", 3L, 3L, 0L, 3000L, 2999L));
        expectedOutput.add(StreamRecordUtils.record("key2", 4L, 4L, 0L, 3000L, 2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 2030L));
        expectedOutput.add(StreamRecordUtils.retractRecord("key1", 3L, 3L, 0L, 3000L, 2999L));
        expectedOutput.add(StreamRecordUtils.record("key1", 4L, 4L, 0L, 3000L, 2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(5100L);
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 5122L));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(StreamRecordUtils.retractRecord("key2", 3L, 3L, 3000L, 6000L, 5999L));
        expectedOutput.add(StreamRecordUtils.record("key2", 4L, 4L, 3000L, 6000L, 5999L));
        expectedOutput.add(new Watermark(5999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(6001L);
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(6999L));
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(new Watermark(6999L));
        expectedOutput.add(new Watermark(7999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 2877L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 2899L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        Assert.assertEquals((String)"Close was not called.", (long)2L, (long)closeCalled.get());
    }

    @Test
    public void testProcessingTimeTumblingWindows() throws Exception {
        closeCalled.set(0);
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).tumble(Duration.ofSeconds(3L)).withProcessingTime().aggregate((NamespaceAggsHandleFunction)new SumAndCountAggTimeWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes).build();
        OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.setProcessingTime(3L);
        testHarness.processElement(StreamRecordUtils.record("key2", 1, Long.MAX_VALUE));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 7000L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 7000L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 7000L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 7000L));
        testHarness.setProcessingTime(5000L);
        expectedOutput.add(StreamRecordUtils.record("key2", 3L, 3L, 0L, 3000L, 2999L));
        expectedOutput.add(StreamRecordUtils.record("key1", 2L, 2L, 0L, 3000L, 2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 7000L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 7000L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 7000L));
        testHarness.setProcessingTime(7000L);
        expectedOutput.add(StreamRecordUtils.record("key1", 3L, 3L, 3000L, 6000L, 5999L));
        Assert.assertEquals((Object)0L, (Object)operator.getWatermarkLatency().getValue());
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testEventTimeSessionWindows() throws Exception {
        closeCalled.set(0);
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).session(Duration.ofSeconds(3L)).withEventTime(2).aggregate((NamespaceAggsHandleFunction)new SumAndCountAggTimeWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes).build();
        OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 0L));
        testHarness.processElement(StreamRecordUtils.record("key2", 2, 1000L));
        testHarness.processElement(StreamRecordUtils.record("key2", 3, 2500L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 10L));
        testHarness.processElement(StreamRecordUtils.record("key1", 2, 1000L));
        OperatorSubtaskState snapshotV2 = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshotV2);
        testHarness.open();
        Assert.assertEquals((Object)0L, (Object)operator.getWatermarkLatency().getValue());
        testHarness.processElement(StreamRecordUtils.record("key1", 3, 2500L));
        testHarness.processElement(StreamRecordUtils.record("key2", 4, 5501L));
        testHarness.processElement(StreamRecordUtils.record("key2", 5, 6000L));
        testHarness.processElement(StreamRecordUtils.record("key2", 5, 6000L));
        testHarness.processElement(StreamRecordUtils.record("key2", 6, 6050L));
        testHarness.processWatermark(new Watermark(12000L));
        expectedOutput.add(StreamRecordUtils.record("key1", 6L, 3L, 10L, 5500L, 5499L));
        expectedOutput.add(StreamRecordUtils.record("key2", 6L, 3L, 0L, 5500L, 5499L));
        expectedOutput.add(StreamRecordUtils.record("key2", 20L, 4L, 5501L, 9050L, 9049L));
        expectedOutput.add(new Watermark(12000L));
        testHarness.processElement(StreamRecordUtils.record("key1", 3, 4000L));
        testHarness.processElement(StreamRecordUtils.record("key2", 10, 15000L));
        testHarness.processElement(StreamRecordUtils.record("key2", 20, 15000L));
        testHarness.processWatermark(new Watermark(17999L));
        expectedOutput.add(StreamRecordUtils.record("key2", 30L, 2L, 15000L, 18000L, 17999L));
        expectedOutput.add(new Watermark(17999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(18000L);
        Assert.assertEquals((Object)1L, (Object)operator.getWatermarkLatency().getValue());
        testHarness.close();
        Assert.assertEquals((String)"Close was not called.", (long)2L, (long)closeCalled.get());
        Assert.assertEquals((long)1L, (long)operator.getNumLateRecordsDropped().getCount());
    }

    @Test
    public void testProcessingTimeSessionWindows() throws Throwable {
        closeCalled.set(0);
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).session(Duration.ofSeconds(3L)).withProcessingTime().aggregate((NamespaceAggsHandleFunction)new SumAndCountAggTimeWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes).build();
        OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = this.createTestHarness(operator);
        BaseRowHarnessAssertor assertor = new BaseRowHarnessAssertor(this.outputType.getFieldTypes(), new GenericRowRecordSortComparator(0, (LogicalType)new VarCharType(Integer.MAX_VALUE)));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.setProcessingTime(3L);
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 1L));
        testHarness.setProcessingTime(1000L);
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 1002L));
        testHarness.setProcessingTime(5000L);
        expectedOutput.add(StreamRecordUtils.record("key2", 2L, 2L, 3L, 4000L, 3999L));
        assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 5000L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 5000L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 5000L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 5000L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 5000L));
        testHarness.setProcessingTime(10000L);
        expectedOutput.add(StreamRecordUtils.record("key2", 2L, 2L, 5000L, 8000L, 7999L));
        expectedOutput.add(StreamRecordUtils.record("key1", 3L, 3L, 5000L, 8000L, 7999L));
        assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testPointSessions() throws Exception {
        closeCalled.set(0);
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).assigner((WindowAssigner)new PointSessionWindowAssigner(3000L)).withEventTime(2).aggregate((NamespaceAggsHandleFunction)new SumAndCountAggTimeWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes).build();
        OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 0L));
        testHarness.processElement(StreamRecordUtils.record("key2", 33, 1000L));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.record("key2", 33, 2500L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 10L));
        testHarness.processElement(StreamRecordUtils.record("key1", 2, 1000L));
        testHarness.processElement(StreamRecordUtils.record("key1", 33, 2500L));
        testHarness.processWatermark(new Watermark(12000L));
        expectedOutput.add(StreamRecordUtils.record("key1", 36L, 3L, 10L, 4000L, 3999L));
        expectedOutput.add(StreamRecordUtils.record("key2", 67L, 3L, 0L, 3000L, 2999L));
        expectedOutput.add(new Watermark(12000L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        Assert.assertEquals((String)"Close was not called.", (long)2L, (long)closeCalled.get());
    }

    @Test
    public void testLateness() throws Exception {
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).tumble(Duration.ofSeconds(2L)).withEventTime(2).aggregate((NamespaceAggsHandleFunction)new SumAndCountAggTimeWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes).withAllowedLateness(Duration.ofMillis(500L)).withSendRetraction().build();
        OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 500L));
        testHarness.processWatermark(new Watermark(1500L));
        expectedOutput.add(new Watermark(1500L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 1300L));
        testHarness.processWatermark(new Watermark(2300L));
        GenericRow key2Result = GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"key2"), 2L, 2L, 0L, 2000L, 1999L});
        expectedOutput.add(new StreamRecord((Object)key2Result));
        expectedOutput.add(new Watermark(2300L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 1997L));
        testHarness.processWatermark(new Watermark(6000L));
        BaseRow key2Retract = BaseRowUtil.setRetract((BaseRow)GenericRow.copyReference((GenericRow)key2Result));
        expectedOutput.add(new StreamRecord((Object)key2Retract));
        expectedOutput.add(StreamRecordUtils.record("key2", 3L, 3L, 0L, 2000L, 1999L));
        expectedOutput.add(new Watermark(6000L));
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 1998L));
        testHarness.processWatermark(new Watermark(7000L));
        expectedOutput.add(new Watermark(7000L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        Assert.assertEquals((long)1L, (long)operator.getNumLateRecordsDropped().getCount());
        testHarness.close();
    }

    @Test
    public void testCleanupTimeOverflow() throws Exception {
        long windowSize = 1000L;
        long lateness = 2000L;
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).tumble(Duration.ofMillis(windowSize)).withEventTime(2).aggregate((NamespaceAggsHandleFunction)new SumAndCountAggTimeWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes).withAllowedLateness(Duration.ofMillis(lateness)).withSendRetraction().build();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector)this.keySelector, this.keyType);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        TumblingWindowAssigner windowAssigner = TumblingWindowAssigner.of((Duration)Duration.ofMillis(windowSize));
        long timestamp = 9223372036854774057L;
        Collection windows = windowAssigner.assignWindows((BaseRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"key2"), 1}), timestamp);
        TimeWindow window = (TimeWindow)windows.iterator().next();
        testHarness.processElement(StreamRecordUtils.record("key2", 1, timestamp));
        Assert.assertTrue((window.maxTimestamp() + lateness < window.maxTimestamp() ? 1 : 0) != 0);
        Assert.assertTrue((window.maxTimestamp() + lateness < 9223372036854774307L ? 1 : 0) != 0);
        testHarness.processWatermark(new Watermark(9223372036854774307L));
        Assert.assertTrue((9223372036854774307L < window.maxTimestamp() ? 1 : 0) != 0);
        Assert.assertTrue((window.maxTimestamp() < Long.MAX_VALUE ? 1 : 0) != 0);
        testHarness.processWatermark(new Watermark(window.maxTimestamp()));
        expected.add(new Watermark(9223372036854774307L));
        expected.add(StreamRecordUtils.record("key2", 1L, 1L, window.getStart(), window.getEnd(), window.maxTimestamp()));
        expected.add(new Watermark(window.maxTimestamp()));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testCleanupTimerWithEmptyReduceStateForTumblingWindows() throws Exception {
        int windowSize = 2;
        long lateness = 1L;
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).tumble(Duration.ofSeconds(2L)).withEventTime(2).aggregate((NamespaceAggsHandleFunction)new SumAndCountAggTimeWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes).withAllowedLateness(Duration.ofMillis(1L)).withSendRetraction().build();
        OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = this.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 1000L));
        testHarness.processWatermark(new Watermark(1599L));
        testHarness.processWatermark(new Watermark(1999L));
        testHarness.processWatermark(new Watermark(2000L));
        testHarness.processWatermark(new Watermark(5000L));
        expected.add(new Watermark(1599L));
        expected.add(StreamRecordUtils.record("key2", 1L, 1L, 0L, 2000L, 1999L));
        expected.add(new Watermark(1999L));
        expected.add(new Watermark(2000L));
        expected.add(new Watermark(5000L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testTumblingCountWindow() throws Exception {
        closeCalled.set(0);
        int windowSize = 3;
        LogicalType[] windowTypes = new LogicalType[]{new BigIntType()};
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).countWindow(3L).aggregate((NamespaceAggsHandleFunction)new SumAndCountAggCountWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, windowTypes).build();
        OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 0L));
        testHarness.processElement(StreamRecordUtils.record("key2", 2, 1000L));
        testHarness.processElement(StreamRecordUtils.record("key2", 3, 2500L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 10L));
        testHarness.processElement(StreamRecordUtils.record("key1", 2, 1000L));
        testHarness.processWatermark(new Watermark(12000L));
        testHarness.setProcessingTime(12000L);
        expectedOutput.add(StreamRecordUtils.record("key2", 6L, 3L, 0L));
        expectedOutput.add(new Watermark(12000L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        OperatorSubtaskState snapshotV2 = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshotV2);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.record("key1", 2, 2500L));
        expectedOutput.add(StreamRecordUtils.record("key1", 5L, 3L, 0L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.record("key2", 4, 5501L));
        testHarness.processElement(StreamRecordUtils.record("key2", 5, 6000L));
        testHarness.processElement(StreamRecordUtils.record("key2", 5, 6000L));
        testHarness.processElement(StreamRecordUtils.record("key2", 6, 6050L));
        expectedOutput.add(StreamRecordUtils.record("key2", 14L, 3L, 1L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.record("key1", 3, 4000L));
        testHarness.processElement(StreamRecordUtils.record("key2", 10, 15000L));
        testHarness.processElement(StreamRecordUtils.record("key2", 20, 15000L));
        expectedOutput.add(StreamRecordUtils.record("key2", 36L, 3L, 2L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.record("key1", 2, 2500L));
        testHarness.processElement(StreamRecordUtils.record("key1", 2, 2500L));
        expectedOutput.add(StreamRecordUtils.record("key1", 7L, 3L, 1L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        Assert.assertEquals((String)"Close was not called.", (long)2L, (long)closeCalled.get());
    }

    @Test
    public void testSlidingCountWindow() throws Exception {
        closeCalled.set(0);
        int windowSize = 5;
        int windowSlide = 3;
        LogicalType[] windowTypes = new LogicalType[]{new BigIntType()};
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).countWindow(5L, 3L).aggregate((NamespaceAggsHandleFunction)new SumAndCountAggCountWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, windowTypes).build();
        OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.record("key2", 1, 0L));
        testHarness.processElement(StreamRecordUtils.record("key2", 2, 1000L));
        testHarness.processElement(StreamRecordUtils.record("key2", 3, 2500L));
        testHarness.processElement(StreamRecordUtils.record("key2", 4, 2500L));
        testHarness.processElement(StreamRecordUtils.record("key2", 5, 2500L));
        testHarness.processElement(StreamRecordUtils.record("key1", 1, 10L));
        testHarness.processElement(StreamRecordUtils.record("key1", 2, 1000L));
        testHarness.processWatermark(new Watermark(12000L));
        testHarness.setProcessingTime(12000L);
        expectedOutput.add(StreamRecordUtils.record("key2", 15L, 5L, 0L));
        expectedOutput.add(new Watermark(12000L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        OperatorSubtaskState snapshotV2 = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshotV2);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.record("key1", 3, 2500L));
        testHarness.processElement(StreamRecordUtils.record("key1", 4, 2500L));
        testHarness.processElement(StreamRecordUtils.record("key1", 5, 2500L));
        expectedOutput.add(StreamRecordUtils.record("key1", 15L, 5L, 0L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.record("key2", 6, 6000L));
        testHarness.processElement(StreamRecordUtils.record("key2", 7, 6000L));
        testHarness.processElement(StreamRecordUtils.record("key2", 8, 6050L));
        testHarness.processElement(StreamRecordUtils.record("key2", 9, 6050L));
        expectedOutput.add(StreamRecordUtils.record("key2", 30L, 5L, 1L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.record("key1", 6, 4000L));
        testHarness.processElement(StreamRecordUtils.record("key1", 7, 4000L));
        testHarness.processElement(StreamRecordUtils.record("key1", 8, 4000L));
        testHarness.processElement(StreamRecordUtils.record("key2", 10, 15000L));
        testHarness.processElement(StreamRecordUtils.record("key2", 11, 15000L));
        expectedOutput.add(StreamRecordUtils.record("key1", 30L, 5L, 1L));
        expectedOutput.add(StreamRecordUtils.record("key2", 45L, 5L, 2L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        Assert.assertEquals((String)"Close was not called.", (long)2L, (long)closeCalled.get());
    }

    private OneInputStreamOperatorTestHarness<BaseRow, BaseRow> createTestHarness(WindowOperator operator) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector)this.keySelector, this.keyType);
    }

    private static class GenericRowEqualiser
    implements RecordEqualiser {
        private final LogicalType[] fieldTypes;

        GenericRowEqualiser(LogicalType[] aggResultTypes, LogicalType[] windowTypes) {
            int size = aggResultTypes.length + windowTypes.length;
            this.fieldTypes = new LogicalType[size];
            for (int i = 0; i < size; ++i) {
                this.fieldTypes[i] = i < aggResultTypes.length ? aggResultTypes[i] : windowTypes[i - aggResultTypes.length];
            }
        }

        public boolean equals(BaseRow row1, BaseRow row2) {
            GenericRow left = BaseRowUtil.toGenericRow((BaseRow)row1, (LogicalType[])this.fieldTypes);
            GenericRow right = BaseRowUtil.toGenericRow((BaseRow)row2, (LogicalType[])this.fieldTypes);
            return left.equals((Object)right);
        }

        public boolean equalsWithoutHeader(BaseRow row1, BaseRow row2) {
            GenericRow left = BaseRowUtil.toGenericRow((BaseRow)row1, (LogicalType[])this.fieldTypes);
            GenericRow right = BaseRowUtil.toGenericRow((BaseRow)row2, (LogicalType[])this.fieldTypes);
            return left.equalsWithoutHeader((BaseRow)right);
        }
    }

    private static class SumAndCountAgg<W extends Window>
    implements NamespaceAggsHandleFunction<W> {
        private static final long serialVersionUID = 2822222597580664436L;
        protected boolean openCalled = false;
        long sum;
        boolean sumIsNull;
        long count;
        boolean countIsNull;

        private SumAndCountAgg() {
        }

        public void open(StateDataViewStore store) throws Exception {
            this.openCalled = true;
        }

        public void setAccumulators(W namespace, BaseRow acc) throws Exception {
            if (!this.openCalled) {
                Assert.fail((String)"Open was not called");
            }
            this.sumIsNull = acc.isNullAt(0);
            if (!this.sumIsNull) {
                this.sum = acc.getLong(0);
            }
            this.countIsNull = acc.isNullAt(1);
            if (!this.countIsNull) {
                this.count = acc.getLong(1);
            }
        }

        public void accumulate(BaseRow inputRow) throws Exception {
            boolean inputIsNull;
            if (!this.openCalled) {
                Assert.fail((String)"Open was not called");
            }
            if (!(inputIsNull = inputRow.isNullAt(1))) {
                this.sum += (long)inputRow.getInt(1);
                ++this.count;
            }
        }

        public void retract(BaseRow inputRow) throws Exception {
            boolean inputIsNull;
            if (!this.openCalled) {
                Assert.fail((String)"Open was not called");
            }
            if (!(inputIsNull = inputRow.isNullAt(1))) {
                this.sum -= (long)inputRow.getInt(1);
                --this.count;
            }
        }

        public void merge(W w, BaseRow otherAcc) throws Exception {
            boolean countIsNull2;
            boolean sumIsNull2;
            if (!this.openCalled) {
                Assert.fail((String)"Open was not called");
            }
            if (!(sumIsNull2 = otherAcc.isNullAt(0))) {
                this.sum += otherAcc.getLong(0);
            }
            if (!(countIsNull2 = otherAcc.isNullAt(1))) {
                this.count += otherAcc.getLong(1);
            }
        }

        public BaseRow createAccumulators() {
            if (!this.openCalled) {
                Assert.fail((String)"Open was not called");
            }
            GenericRow acc = new GenericRow(2);
            acc.setField(0, (Object)0L);
            acc.setField(1, (Object)0L);
            return acc;
        }

        public BaseRow getAccumulators() throws Exception {
            if (!this.openCalled) {
                Assert.fail((String)"Open was not called");
            }
            GenericRow row = new GenericRow(2);
            if (!this.sumIsNull) {
                row.setField(0, (Object)this.sum);
            }
            if (!this.countIsNull) {
                row.setField(1, (Object)this.count);
            }
            return row;
        }

        public BaseRow getValue(W namespace) throws Exception {
            if (!this.openCalled) {
                Assert.fail((String)"Open was not called");
            }
            GenericRow row = new GenericRow(2);
            if (!this.sumIsNull) {
                row.setField(0, (Object)this.sum);
            }
            if (!this.countIsNull) {
                row.setField(1, (Object)this.count);
            }
            return row;
        }

        public void cleanup(W window) {
        }

        public void close() {
            closeCalled.incrementAndGet();
        }
    }

    private static class SumAndCountAggCountWindow
    extends SumAndCountAgg<CountWindow> {
        private static final long serialVersionUID = -2634639678371135643L;

        private SumAndCountAggCountWindow() {
        }

        @Override
        public BaseRow getValue(CountWindow namespace) throws Exception {
            if (!this.openCalled) {
                Assert.fail((String)"Open was not called");
            }
            GenericRow row = new GenericRow(3);
            if (!this.sumIsNull) {
                row.setField(0, (Object)this.sum);
            }
            if (!this.countIsNull) {
                row.setField(1, (Object)this.count);
            }
            row.setField(2, (Object)namespace.getId());
            return row;
        }
    }

    private static class SumAndCountAggTimeWindow
    extends SumAndCountAgg<TimeWindow> {
        private static final long serialVersionUID = 2062031590687738047L;

        private SumAndCountAggTimeWindow() {
        }

        @Override
        public BaseRow getValue(TimeWindow namespace) throws Exception {
            if (!this.openCalled) {
                Assert.fail((String)"Open was not called");
            }
            GenericRow row = new GenericRow(5);
            if (!this.sumIsNull) {
                row.setField(0, (Object)this.sum);
            }
            if (!this.countIsNull) {
                row.setField(1, (Object)this.count);
            }
            row.setField(2, (Object)namespace.getStart());
            row.setField(3, (Object)namespace.getEnd());
            row.setField(4, (Object)namespace.maxTimestamp());
            return row;
        }
    }

    private static class PointSessionWindowAssigner
    extends SessionWindowAssigner {
        private static final long serialVersionUID = 1L;
        private final long sessionTimeout;

        private PointSessionWindowAssigner(long sessionTimeout) {
            super(sessionTimeout, true);
            this.sessionTimeout = sessionTimeout;
        }

        private PointSessionWindowAssigner(long sessionTimeout, boolean isEventTime) {
            super(sessionTimeout, isEventTime);
            this.sessionTimeout = sessionTimeout;
        }

        public Collection<TimeWindow> assignWindows(BaseRow element, long timestamp) {
            int second = element.getInt(1);
            if (second == 33) {
                return Collections.singletonList(new TimeWindow(timestamp, timestamp));
            }
            return Collections.singletonList(new TimeWindow(timestamp, timestamp + this.sessionTimeout));
        }

        public SessionWindowAssigner withEventTime() {
            return new PointSessionWindowAssigner(this.sessionTimeout, true);
        }

        public SessionWindowAssigner withProcessingTime() {
            return new PointSessionWindowAssigner(this.sessionTimeout, false);
        }
    }
}

