package org.apache.flink.table.runtime.operators.window;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.JoinedRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.runtime.dataview.StateDataViewStore;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceTableAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
import org.apache.flink.table.runtime.generated.NamespaceTableAggsHandleFunction;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.operators.window.assigners.SessionWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.TumblingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.triggers.ElementTriggers;
import org.apache.flink.table.runtime.operators.window.triggers.EventTimeTriggers;
import org.apache.flink.table.runtime.operators.window.triggers.ProcessingTimeTriggers;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/table/runtime/operators/window/WindowOperatorTest.class */
public class WindowOperatorTest {
    private final boolean isTableAggregate;
    private static final SumAndCountAggTimeWindow sumAndCountAggTimeWindow = new SumAndCountAggTimeWindow();
    private static final SumAndCountTableAggTimeWindow sumAndCountTableAggTimeWindow = new SumAndCountTableAggTimeWindow();
    private static final SumAndCountAggCountWindow sumAndCountAggCountWindow = new SumAndCountAggCountWindow();
    private static final SumAndCountTableAggCountWindow sumAndCountTableAggCountWindow = new SumAndCountTableAggCountWindow();
    private static AtomicInteger closeCalled = new AtomicInteger(0);
    private LogicalType[] inputFieldTypes = {new VarCharType(Integer.MAX_VALUE), new IntType(), new BigIntType()};
    private RowDataTypeInfo outputType = new RowDataTypeInfo(new LogicalType[]{new VarCharType(Integer.MAX_VALUE), new BigIntType(), new BigIntType(), new BigIntType(), new BigIntType(), new BigIntType()});
    private LogicalType[] aggResultTypes = {new BigIntType(), new BigIntType()};
    private LogicalType[] accTypes = {new BigIntType(), new BigIntType()};
    private LogicalType[] windowTypes = {new BigIntType(), new BigIntType(), new BigIntType()};
    private GenericRowEqualiser equaliser = new GenericRowEqualiser(this.accTypes, this.windowTypes);
    private BinaryRowDataKeySelector keySelector = new BinaryRowDataKeySelector(new int[]{0}, this.inputFieldTypes);
    private TypeInformation<RowData> keyType = this.keySelector.m92getProducedType();
    private RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(this.outputType.getFieldTypes(), new GenericRowRecordSortComparator(0, new VarCharType(Integer.MAX_VALUE)));

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/window/WindowOperatorTest$GenericRowEqualiser.class */
    private static class GenericRowEqualiser implements RecordEqualiser {
        private final LogicalType[] fieldTypes;

        GenericRowEqualiser(LogicalType[] logicalTypeArr, LogicalType[] logicalTypeArr2) {
            int length = logicalTypeArr.length + logicalTypeArr2.length;
            this.fieldTypes = new LogicalType[length];
            for (int i = 0; i < length; i++) {
                if (i < logicalTypeArr.length) {
                    this.fieldTypes[i] = logicalTypeArr[i];
                } else {
                    this.fieldTypes[i] = logicalTypeArr2[i - logicalTypeArr.length];
                }
            }
        }

        public boolean equals(RowData rowData, RowData rowData2) {
            return RowDataUtil.toGenericRow(rowData, this.fieldTypes).equals(RowDataUtil.toGenericRow(rowData2, this.fieldTypes));
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/window/WindowOperatorTest$PointSessionWindowAssigner.class */
    private static class PointSessionWindowAssigner extends SessionWindowAssigner {
        private static final long serialVersionUID = 1;
        private final long sessionTimeout;

        private PointSessionWindowAssigner(long j) {
            super(j, true);
            this.sessionTimeout = j;
        }

        private PointSessionWindowAssigner(long j, boolean z) {
            super(j, z);
            this.sessionTimeout = j;
        }

        public Collection<TimeWindow> assignWindows(RowData rowData, long j) {
            return rowData.getInt(1) == 33 ? Collections.singletonList(new TimeWindow(j, j)) : Collections.singletonList(new TimeWindow(j, j + this.sessionTimeout));
        }

        /* renamed from: withEventTime, reason: merged with bridge method [inline-methods] */
        public SessionWindowAssigner m65withEventTime() {
            return new PointSessionWindowAssigner(this.sessionTimeout, true);
        }

        /* renamed from: withProcessingTime, reason: merged with bridge method [inline-methods] */
        public SessionWindowAssigner m64withProcessingTime() {
            return new PointSessionWindowAssigner(this.sessionTimeout, false);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/window/WindowOperatorTest$SumAndCountAggBase.class */
    public static class SumAndCountAggBase<W extends Window> {
        boolean openCalled;
        long sum;
        boolean sumIsNull;
        long count;
        boolean countIsNull;
        protected transient JoinedRowData result;

        private SumAndCountAggBase() {
        }

        public void open(StateDataViewStore stateDataViewStore) throws Exception {
            this.openCalled = true;
            this.result = new JoinedRowData();
        }

        public void setAccumulators(W w, RowData rowData) throws Exception {
            if (!this.openCalled) {
                Assert.fail("Open was not called");
            }
            this.sumIsNull = rowData.isNullAt(0);
            if (!this.sumIsNull) {
                this.sum = rowData.getLong(0);
            }
            this.countIsNull = rowData.isNullAt(1);
            if (this.countIsNull) {
                return;
            }
            this.count = rowData.getLong(1);
        }

        public void accumulate(RowData rowData) throws Exception {
            if (!this.openCalled) {
                Assert.fail("Open was not called");
            }
            if (rowData.isNullAt(1)) {
                return;
            }
            this.sum += rowData.getInt(1);
            this.count++;
        }

        public void retract(RowData rowData) throws Exception {
            if (!this.openCalled) {
                Assert.fail("Open was not called");
            }
            if (rowData.isNullAt(1)) {
                return;
            }
            this.sum -= rowData.getInt(1);
            this.count--;
        }

        public void merge(W w, RowData rowData) throws Exception {
            if (!this.openCalled) {
                Assert.fail("Open was not called");
            }
            if (!rowData.isNullAt(0)) {
                this.sum += rowData.getLong(0);
            }
            if (rowData.isNullAt(1)) {
                return;
            }
            this.count += rowData.getLong(1);
        }

        public RowData createAccumulators() {
            if (!this.openCalled) {
                Assert.fail("Open was not called");
            }
            GenericRowData genericRowData = new GenericRowData(2);
            genericRowData.setField(0, 0L);
            genericRowData.setField(1, 0L);
            return genericRowData;
        }

        public RowData getAccumulators() throws Exception {
            if (!this.openCalled) {
                Assert.fail("Open was not called");
            }
            GenericRowData genericRowData = new GenericRowData(2);
            if (!this.sumIsNull) {
                genericRowData.setField(0, Long.valueOf(this.sum));
            }
            if (!this.countIsNull) {
                genericRowData.setField(1, Long.valueOf(this.count));
            }
            return genericRowData;
        }

        public void cleanup(W w) {
        }

        public void close() {
            WindowOperatorTest.closeCalled.incrementAndGet();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/window/WindowOperatorTest$SumAndCountAggCountWindow.class */
    public static class SumAndCountAggCountWindow extends SumAndCountAggBase<CountWindow> implements NamespaceAggsHandleFunction<CountWindow> {
        private static final long serialVersionUID = -2634639678371135643L;

        private SumAndCountAggCountWindow() {
            super();
        }

        public RowData getValue(CountWindow countWindow) throws Exception {
            if (!this.openCalled) {
                Assert.fail("Open was not called");
            }
            GenericRowData genericRowData = new GenericRowData(3);
            if (!this.sumIsNull) {
                genericRowData.setField(0, Long.valueOf(this.sum));
            }
            if (!this.countIsNull) {
                genericRowData.setField(1, Long.valueOf(this.count));
            }
            genericRowData.setField(2, Long.valueOf(countWindow.getId()));
            return genericRowData;
        }

        public /* bridge */ /* synthetic */ void cleanup(Object obj) throws Exception {
            super.cleanup((SumAndCountAggCountWindow) obj);
        }

        public /* bridge */ /* synthetic */ void merge(Object obj, RowData rowData) throws Exception {
            super.merge((SumAndCountAggCountWindow) obj, rowData);
        }

        public /* bridge */ /* synthetic */ void setAccumulators(Object obj, RowData rowData) throws Exception {
            super.setAccumulators((SumAndCountAggCountWindow) obj, rowData);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/window/WindowOperatorTest$SumAndCountAggTimeWindow.class */
    public static class SumAndCountAggTimeWindow extends SumAndCountAggBase<TimeWindow> implements NamespaceAggsHandleFunction<TimeWindow> {
        private static final long serialVersionUID = 2062031590687738047L;

        private SumAndCountAggTimeWindow() {
            super();
        }

        public RowData getValue(TimeWindow timeWindow) throws Exception {
            if (!this.openCalled) {
                Assert.fail("Open was not called");
            }
            GenericRowData genericRowData = new GenericRowData(5);
            if (!this.sumIsNull) {
                genericRowData.setField(0, Long.valueOf(this.sum));
            }
            if (!this.countIsNull) {
                genericRowData.setField(1, Long.valueOf(this.count));
            }
            genericRowData.setField(2, Long.valueOf(timeWindow.getStart()));
            genericRowData.setField(3, Long.valueOf(timeWindow.getEnd()));
            genericRowData.setField(4, Long.valueOf(timeWindow.maxTimestamp()));
            return genericRowData;
        }

        public /* bridge */ /* synthetic */ void cleanup(Object obj) throws Exception {
            super.cleanup((SumAndCountAggTimeWindow) obj);
        }

        public /* bridge */ /* synthetic */ void merge(Object obj, RowData rowData) throws Exception {
            super.merge((SumAndCountAggTimeWindow) obj, rowData);
        }

        public /* bridge */ /* synthetic */ void setAccumulators(Object obj, RowData rowData) throws Exception {
            super.setAccumulators((SumAndCountAggTimeWindow) obj, rowData);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/window/WindowOperatorTest$SumAndCountTableAggCountWindow.class */
    public static class SumAndCountTableAggCountWindow extends SumAndCountAggBase<CountWindow> implements NamespaceTableAggsHandleFunction<CountWindow> {
        private static final long serialVersionUID = -2634639678371135643L;

        private SumAndCountTableAggCountWindow() {
            super();
        }

        public void emitValue(CountWindow countWindow, RowData rowData, Collector<RowData> collector) throws Exception {
            if (!this.openCalled) {
                Assert.fail("Open was not called");
            }
            GenericRowData genericRowData = new GenericRowData(3);
            if (!this.sumIsNull) {
                genericRowData.setField(0, Long.valueOf(this.sum));
            }
            if (!this.countIsNull) {
                genericRowData.setField(1, Long.valueOf(this.count));
            }
            genericRowData.setField(2, Long.valueOf(countWindow.getId()));
            this.result.replace(rowData, genericRowData);
            collector.collect(this.result);
            collector.collect(this.result);
        }

        public /* bridge */ /* synthetic */ void emitValue(Object obj, RowData rowData, Collector collector) throws Exception {
            emitValue((CountWindow) obj, rowData, (Collector<RowData>) collector);
        }

        public /* bridge */ /* synthetic */ void cleanup(Object obj) throws Exception {
            super.cleanup((SumAndCountTableAggCountWindow) obj);
        }

        public /* bridge */ /* synthetic */ void merge(Object obj, RowData rowData) throws Exception {
            super.merge((SumAndCountTableAggCountWindow) obj, rowData);
        }

        public /* bridge */ /* synthetic */ void setAccumulators(Object obj, RowData rowData) throws Exception {
            super.setAccumulators((SumAndCountTableAggCountWindow) obj, rowData);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/window/WindowOperatorTest$SumAndCountTableAggTimeWindow.class */
    public static class SumAndCountTableAggTimeWindow extends SumAndCountAggBase<TimeWindow> implements NamespaceTableAggsHandleFunction<TimeWindow> {
        private static final long serialVersionUID = 2062031590687738047L;

        private SumAndCountTableAggTimeWindow() {
            super();
        }

        public void emitValue(TimeWindow timeWindow, RowData rowData, Collector<RowData> collector) throws Exception {
            if (!this.openCalled) {
                Assert.fail("Open was not called");
            }
            GenericRowData genericRowData = new GenericRowData(5);
            if (!this.sumIsNull) {
                genericRowData.setField(0, Long.valueOf(this.sum));
            }
            if (!this.countIsNull) {
                genericRowData.setField(1, Long.valueOf(this.count));
            }
            genericRowData.setField(2, Long.valueOf(timeWindow.getStart()));
            genericRowData.setField(3, Long.valueOf(timeWindow.getEnd()));
            genericRowData.setField(4, Long.valueOf(timeWindow.maxTimestamp()));
            this.result.replace(rowData, genericRowData);
            collector.collect(this.result);
            collector.collect(this.result);
        }

        public /* bridge */ /* synthetic */ void emitValue(Object obj, RowData rowData, Collector collector) throws Exception {
            emitValue((TimeWindow) obj, rowData, (Collector<RowData>) collector);
        }

        public /* bridge */ /* synthetic */ void cleanup(Object obj) throws Exception {
            super.cleanup((SumAndCountTableAggTimeWindow) obj);
        }

        public /* bridge */ /* synthetic */ void merge(Object obj, RowData rowData) throws Exception {
            super.merge((SumAndCountTableAggTimeWindow) obj, rowData);
        }

        public /* bridge */ /* synthetic */ void setAccumulators(Object obj, RowData rowData) throws Exception {
            super.setAccumulators((SumAndCountTableAggTimeWindow) obj, rowData);
        }
    }

    @Parameterized.Parameters(name = "isTableAggregate = {0}")
    public static Collection<Object[]> runMode() {
        return Arrays.asList(new Object[]{false}, new Object[]{true});
    }

    public WindowOperatorTest(boolean z) {
        this.isTableAggregate = z;
    }

    private NamespaceAggsHandleFunctionBase getTimeWindowAggFunction() {
        return this.isTableAggregate ? sumAndCountTableAggTimeWindow : sumAndCountAggTimeWindow;
    }

    private NamespaceAggsHandleFunctionBase getCountWindowAggFunction() {
        return this.isTableAggregate ? sumAndCountTableAggCountWindow : sumAndCountAggCountWindow;
    }

    private ConcurrentLinkedQueue<Object> doubleRecord(boolean z, StreamRecord<RowData> streamRecord) {
        ConcurrentLinkedQueue<Object> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
        concurrentLinkedQueue.add(streamRecord);
        if (z) {
            concurrentLinkedQueue.add(streamRecord);
        }
        return concurrentLinkedQueue;
    }

    @Test
    public void testEventTimeSlidingWindows() throws Exception {
        closeCalled.set(0);
        WindowOperator aggregateAndBuild = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).sliding(Duration.ofSeconds(3L), Duration.ofSeconds(1L)).withEventTime(2).aggregateAndBuild(getTimeWindowAggFunction(), this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(aggregateAndBuild);
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 3999L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 3000L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 20L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 0L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 999L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1998L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1999L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1000L));
        createTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 3L, 3L, -2000L, 1000L, 999L)));
        concurrentLinkedQueue.add(new Watermark(999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 3L, 3L, -1000L, 2000L, 1999L)));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 3L, 3L, -1000L, 2000L, 1999L)));
        concurrentLinkedQueue.add(new Watermark(1999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processWatermark(new Watermark(2999L));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 3L, 3L, 0L, 3000L, 2999L)));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 3L, 3L, 0L, 3000L, 2999L)));
        concurrentLinkedQueue.add(new Watermark(2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        concurrentLinkedQueue.clear();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness2 = createTestHarness(aggregateAndBuild);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processWatermark(new Watermark(3999L));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 5L, 5L, 1000L, 4000L, 3999L)));
        concurrentLinkedQueue.add(new Watermark(3999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processWatermark(new Watermark(4999L));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 2L, 2L, 2000L, 5000L, 4999L)));
        concurrentLinkedQueue.add(new Watermark(4999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processWatermark(new Watermark(5999L));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 2L, 2L, 3000L, 6000L, 5999L)));
        concurrentLinkedQueue.add(new Watermark(5999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processWatermark(new Watermark(6999L));
        createTestHarness2.processWatermark(new Watermark(7999L));
        concurrentLinkedQueue.add(new Watermark(6999L));
        concurrentLinkedQueue.add(new Watermark(7999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.close();
        Assert.assertEquals("Close was not called.", 2L, closeCalled.get());
    }

    @Test
    public void testProcessingTimeSlidingWindows() throws Throwable {
        closeCalled.set(0);
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).sliding(Duration.ofSeconds(3L), Duration.ofSeconds(1L)).withProcessingTime().aggregateAndBuild(getTimeWindowAggFunction(), this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.setProcessingTime(3L);
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, Long.MAX_VALUE));
        createTestHarness.setProcessingTime(1000L);
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 1L, 1L, -2000L, 1000L, 999L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, Long.MAX_VALUE));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, Long.MAX_VALUE));
        createTestHarness.setProcessingTime(2000L);
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 3L, 3L, -1000L, 2000L, 1999L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, Long.MAX_VALUE));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, Long.MAX_VALUE));
        createTestHarness.setProcessingTime(3000L);
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 3L, 3L, 0L, 3000L, 2999L)));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 2L, 2L, 0L, 3000L, 2999L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, Long.MAX_VALUE));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, Long.MAX_VALUE));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, Long.MAX_VALUE));
        createTestHarness.setProcessingTime(7000L);
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 2L, 2L, 1000L, 4000L, 3999L)));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 5L, 5L, 1000L, 4000L, 3999L)));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 5L, 5L, 2000L, 5000L, 4999L)));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 3L, 3L, 3000L, 6000L, 5999L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.close();
    }

    @Test
    public void testEventTimeTumblingWindows() throws Exception {
        closeCalled.set(0);
        WindowOperator aggregateAndBuild = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).tumble(Duration.ofSeconds(3L)).withEventTime(2).aggregateAndBuild(getTimeWindowAggFunction(), this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(aggregateAndBuild);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 3999L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 3000L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 20L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 0L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 999L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1998L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1999L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1000L));
        createTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.add(new Watermark(999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        concurrentLinkedQueue.clear();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness2 = createTestHarness(aggregateAndBuild);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processWatermark(new Watermark(2999L));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 3L, 3L, 0L, 3000L, 2999L)));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 3L, 3L, 0L, 3000L, 2999L)));
        concurrentLinkedQueue.add(new Watermark(2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processWatermark(new Watermark(3999L));
        concurrentLinkedQueue.add(new Watermark(3999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processWatermark(new Watermark(4999L));
        concurrentLinkedQueue.add(new Watermark(4999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processWatermark(new Watermark(5999L));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 2L, 2L, 3000L, 6000L, 5999L)));
        concurrentLinkedQueue.add(new Watermark(5999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processWatermark(new Watermark(6999L));
        createTestHarness2.processWatermark(new Watermark(7999L));
        concurrentLinkedQueue.add(new Watermark(6999L));
        concurrentLinkedQueue.add(new Watermark(7999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.close();
        Assert.assertEquals("Close was not called.", 2L, closeCalled.get());
    }

    @Test
    public void testEventTimeTumblingWindowsWithEarlyFiring() throws Exception {
        closeCalled.set(0);
        AggregateWindowOperator build = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).tumble(Duration.ofSeconds(3L)).withEventTime(2).triggering(EventTimeTriggers.afterEndOfWindow().withEarlyFirings(ProcessingTimeTriggers.every(Duration.ofSeconds(1L)))).produceUpdates().aggregate(new SumAndCountAggTimeWindow(), this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(build);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.setProcessingTime(0L);
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 3999L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 3000L));
        createTestHarness.setProcessingTime(1L);
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 20L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 0L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 999L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1998L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1999L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1000L));
        createTestHarness.setProcessingTime(1000L);
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 2L, 2L, 3000L, 6000L, 5999L));
        createTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.add(new Watermark(999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.setProcessingTime(1001L);
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, 0L, 3000L, 2999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, 0L, 3000L, 2999L));
        createTestHarness.processWatermark(new Watermark(1999L));
        createTestHarness.setProcessingTime(2001L);
        concurrentLinkedQueue.add(new Watermark(1999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        concurrentLinkedQueue.clear();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness2 = createTestHarness(build);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.setProcessingTime(3001L);
        createTestHarness2.processWatermark(new Watermark(2999L));
        concurrentLinkedQueue.add(new Watermark(2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 1, 4999L));
        createTestHarness2.processWatermark(new Watermark(3999L));
        createTestHarness2.setProcessingTime(4001L);
        concurrentLinkedQueue.add(new Watermark(3999L));
        concurrentLinkedQueue.add(StreamRecordUtils.updateBeforeRecord("key2", 2L, 2L, 3000L, 6000L, 5999L));
        concurrentLinkedQueue.add(StreamRecordUtils.updateAfterRecord("key2", 3L, 3L, 3000L, 6000L, 5999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 1, 2001L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 1, 2030L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.setProcessingTime(5100L);
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 1, 5122L));
        createTestHarness2.processWatermark(new Watermark(4999L));
        concurrentLinkedQueue.add(new Watermark(4999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processWatermark(new Watermark(5999L));
        concurrentLinkedQueue.add(StreamRecordUtils.updateBeforeRecord("key2", 3L, 3L, 3000L, 6000L, 5999L));
        concurrentLinkedQueue.add(StreamRecordUtils.updateAfterRecord("key2", 4L, 4L, 3000L, 6000L, 5999L));
        concurrentLinkedQueue.add(new Watermark(5999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.setProcessingTime(6001L);
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processWatermark(new Watermark(6999L));
        createTestHarness2.processWatermark(new Watermark(7999L));
        concurrentLinkedQueue.add(new Watermark(6999L));
        concurrentLinkedQueue.add(new Watermark(7999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 1, 2877L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 1, 2899L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.close();
        Assert.assertEquals("Close was not called.", 2L, closeCalled.get());
    }

    @Test
    public void testEventTimeTumblingWindowsWithEarlyAndLateFirings() throws Exception {
        closeCalled.set(0);
        AggregateWindowOperator build = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).tumble(Duration.ofSeconds(3L)).withEventTime(2).triggering(EventTimeTriggers.afterEndOfWindow().withEarlyFirings(ProcessingTimeTriggers.every(Duration.ofSeconds(1L))).withLateFirings(ElementTriggers.every())).withAllowedLateness(Duration.ofSeconds(3L)).produceUpdates().aggregate(new SumAndCountAggTimeWindow(), this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(build);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.setProcessingTime(0L);
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 3999L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 3000L));
        createTestHarness.setProcessingTime(1L);
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 20L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 0L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 999L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1998L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1999L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1000L));
        createTestHarness.setProcessingTime(1000L);
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 2L, 2L, 3000L, 6000L, 5999L));
        createTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.add(new Watermark(999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.setProcessingTime(1001L);
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, 0L, 3000L, 2999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, 0L, 3000L, 2999L));
        createTestHarness.processWatermark(new Watermark(1999L));
        createTestHarness.setProcessingTime(2001L);
        concurrentLinkedQueue.add(new Watermark(1999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        concurrentLinkedQueue.clear();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness2 = createTestHarness(build);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.setProcessingTime(3001L);
        createTestHarness2.processWatermark(new Watermark(2999L));
        concurrentLinkedQueue.add(new Watermark(2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 1, 4999L));
        createTestHarness2.processWatermark(new Watermark(3999L));
        createTestHarness2.setProcessingTime(4001L);
        concurrentLinkedQueue.add(new Watermark(3999L));
        concurrentLinkedQueue.add(StreamRecordUtils.updateBeforeRecord("key2", 2L, 2L, 3000L, 6000L, 5999L));
        concurrentLinkedQueue.add(StreamRecordUtils.updateAfterRecord("key2", 3L, 3L, 3000L, 6000L, 5999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 1, 2001L));
        concurrentLinkedQueue.add(StreamRecordUtils.updateBeforeRecord("key2", 3L, 3L, 0L, 3000L, 2999L));
        concurrentLinkedQueue.add(StreamRecordUtils.updateAfterRecord("key2", 4L, 4L, 0L, 3000L, 2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 1, 2030L));
        concurrentLinkedQueue.add(StreamRecordUtils.updateBeforeRecord("key1", 3L, 3L, 0L, 3000L, 2999L));
        concurrentLinkedQueue.add(StreamRecordUtils.updateAfterRecord("key1", 4L, 4L, 0L, 3000L, 2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.setProcessingTime(5100L);
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 1, 5122L));
        createTestHarness2.processWatermark(new Watermark(4999L));
        concurrentLinkedQueue.add(new Watermark(4999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processWatermark(new Watermark(5999L));
        concurrentLinkedQueue.add(StreamRecordUtils.updateBeforeRecord("key2", 3L, 3L, 3000L, 6000L, 5999L));
        concurrentLinkedQueue.add(StreamRecordUtils.updateAfterRecord("key2", 4L, 4L, 3000L, 6000L, 5999L));
        concurrentLinkedQueue.add(new Watermark(5999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.setProcessingTime(6001L);
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processWatermark(new Watermark(6999L));
        createTestHarness2.processWatermark(new Watermark(7999L));
        concurrentLinkedQueue.add(new Watermark(6999L));
        concurrentLinkedQueue.add(new Watermark(7999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 1, 2877L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 1, 2899L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.close();
        Assert.assertEquals("Close was not called.", 2L, closeCalled.get());
    }

    @Test
    public void testProcessingTimeTumblingWindows() throws Exception {
        closeCalled.set(0);
        WindowOperator aggregateAndBuild = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).tumble(Duration.ofSeconds(3L)).withProcessingTime().aggregateAndBuild(getTimeWindowAggFunction(), this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(aggregateAndBuild);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.setProcessingTime(3L);
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, Long.MAX_VALUE));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 7000L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 7000L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 7000L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 7000L));
        createTestHarness.setProcessingTime(5000L);
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 3L, 3L, 0L, 3000L, 2999L)));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 2L, 2L, 0L, 3000L, 2999L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 7000L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 7000L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 7000L));
        createTestHarness.setProcessingTime(7000L);
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 3L, 3L, 3000L, 6000L, 5999L)));
        Assert.assertEquals(0L, aggregateAndBuild.getWatermarkLatency().getValue());
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.close();
    }

    @Test
    public void testEventTimeSessionWindows() throws Exception {
        closeCalled.set(0);
        WindowOperator aggregateAndBuild = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).session(Duration.ofSeconds(3L)).withEventTime(2).aggregateAndBuild(getTimeWindowAggFunction(), this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(aggregateAndBuild);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 0L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 2, 1000L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 3, 2500L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 10L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 2, 1000L));
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        concurrentLinkedQueue.clear();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness2 = createTestHarness(aggregateAndBuild);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        Assert.assertEquals(0L, aggregateAndBuild.getWatermarkLatency().getValue());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 3, 2500L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 4, 5501L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 5, 6000L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 5, 6000L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 6, 6050L));
        createTestHarness2.processWatermark(new Watermark(12000L));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 6L, 3L, 10L, 5500L, 5499L)));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 6L, 3L, 0L, 5500L, 5499L)));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 20L, 4L, 5501L, 9050L, 9049L)));
        concurrentLinkedQueue.add(new Watermark(12000L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 3, 4000L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 10, 15000L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 20, 15000L));
        createTestHarness2.processWatermark(new Watermark(17999L));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 30L, 2L, 15000L, 18000L, 17999L)));
        concurrentLinkedQueue.add(new Watermark(17999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.setProcessingTime(18000L);
        Assert.assertEquals(1L, aggregateAndBuild.getWatermarkLatency().getValue());
        createTestHarness2.close();
        Assert.assertEquals("Close was not called.", 2L, closeCalled.get());
        Assert.assertEquals(1L, aggregateAndBuild.getNumLateRecordsDropped().getCount());
    }

    @Test
    public void testProcessingTimeSessionWindows() throws Throwable {
        closeCalled.set(0);
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).session(Duration.ofSeconds(3L)).withProcessingTime().aggregateAndBuild(getTimeWindowAggFunction(), this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes));
        RowDataHarnessAssertor rowDataHarnessAssertor = new RowDataHarnessAssertor(this.outputType.getFieldTypes(), new GenericRowRecordSortComparator(0, new VarCharType(Integer.MAX_VALUE)));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.setProcessingTime(3L);
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1L));
        createTestHarness.setProcessingTime(1000L);
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1002L));
        createTestHarness.setProcessingTime(5000L);
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 2L, 2L, 3L, 4000L, 3999L)));
        rowDataHarnessAssertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 5000L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 5000L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 5000L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 5000L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 5000L));
        createTestHarness.setProcessingTime(10000L);
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 2L, 2L, 5000L, 8000L, 7999L)));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 3L, 3L, 5000L, 8000L, 7999L)));
        rowDataHarnessAssertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.close();
    }

    @Test
    public void testPointSessions() throws Exception {
        closeCalled.set(0);
        WindowOperator aggregateAndBuild = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).assigner(new PointSessionWindowAssigner(3000L)).withEventTime(2).aggregateAndBuild(getTimeWindowAggFunction(), this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(aggregateAndBuild);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 0L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 33, 1000L));
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness2 = createTestHarness(aggregateAndBuild);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 33, 2500L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 1, 10L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 2, 1000L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 33, 2500L));
        createTestHarness2.processWatermark(new Watermark(12000L));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 36L, 3L, 10L, 4000L, 3999L)));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 67L, 3L, 0L, 3000L, 2999L)));
        concurrentLinkedQueue.add(new Watermark(12000L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.close();
        Assert.assertEquals("Close was not called.", 2L, closeCalled.get());
    }

    @Test
    public void testLateness() throws Exception {
        WindowOperator aggregateAndBuild = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).tumble(Duration.ofSeconds(2L)).withEventTime(2).withAllowedLateness(Duration.ofMillis(500L)).produceUpdates().aggregateAndBuild(new SumAndCountAggTimeWindow(), this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(aggregateAndBuild);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 500L));
        createTestHarness.processWatermark(new Watermark(1500L));
        concurrentLinkedQueue.add(new Watermark(1500L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1300L));
        createTestHarness.processWatermark(new Watermark(2300L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 2L, 2L, 0L, 2000L, 1999L));
        concurrentLinkedQueue.add(new Watermark(2300L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1997L));
        createTestHarness.processWatermark(new Watermark(6000L));
        concurrentLinkedQueue.add(StreamRecordUtils.updateBeforeRecord("key2", 2L, 2L, 0L, 2000L, 1999L));
        concurrentLinkedQueue.add(StreamRecordUtils.updateAfterRecord("key2", 3L, 3L, 0L, 2000L, 1999L));
        concurrentLinkedQueue.add(new Watermark(6000L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1998L));
        createTestHarness.processWatermark(new Watermark(7000L));
        concurrentLinkedQueue.add(new Watermark(7000L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        Assert.assertEquals(1L, aggregateAndBuild.getNumLateRecordsDropped().getCount());
        createTestHarness.close();
    }

    @Test
    public void testCleanupTimeOverflow() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).tumble(Duration.ofMillis(1000L)).withEventTime(2).withAllowedLateness(Duration.ofMillis(2000L)).produceUpdates().aggregateAndBuild(new SumAndCountAggTimeWindow(), this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes), this.keySelector, this.keyType);
        keyedOneInputStreamOperatorTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        TimeWindow timeWindow = (TimeWindow) TumblingWindowAssigner.of(Duration.ofMillis(1000L)).assignWindows(GenericRowData.of(new Object[]{StringData.fromString("key2"), 1}), 9223372036854774057L).iterator().next();
        keyedOneInputStreamOperatorTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 9223372036854774057L));
        Assert.assertTrue(timeWindow.maxTimestamp() + 2000 < timeWindow.maxTimestamp());
        Assert.assertTrue(timeWindow.maxTimestamp() + 2000 < 9223372036854774307L);
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(9223372036854774307L));
        Assert.assertTrue(9223372036854774307L < timeWindow.maxTimestamp());
        Assert.assertTrue(timeWindow.maxTimestamp() < Long.MAX_VALUE);
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(timeWindow.maxTimestamp()));
        concurrentLinkedQueue.add(new Watermark(9223372036854774307L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd()), Long.valueOf(timeWindow.maxTimestamp())));
        concurrentLinkedQueue.add(new Watermark(timeWindow.maxTimestamp()));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testCleanupTimerWithEmptyReduceStateForTumblingWindows() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).tumble(Duration.ofSeconds(2L)).withEventTime(2).withAllowedLateness(Duration.ofMillis(1L)).produceUpdates().aggregateAndBuild(new SumAndCountAggTimeWindow(), this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes));
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1000L));
        createTestHarness.processWatermark(new Watermark(1599L));
        createTestHarness.processWatermark(new Watermark(1999L));
        createTestHarness.processWatermark(new Watermark(2000L));
        createTestHarness.processWatermark(new Watermark(5000L));
        concurrentLinkedQueue.add(new Watermark(1599L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, 0L, 2000L, 1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(2000L));
        concurrentLinkedQueue.add(new Watermark(5000L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.close();
    }

    @Test
    public void testTumblingCountWindow() throws Exception {
        closeCalled.set(0);
        WindowOperator aggregateAndBuild = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).countWindow(3L).aggregateAndBuild(getCountWindowAggFunction(), this.equaliser, this.accTypes, this.aggResultTypes, new LogicalType[]{new BigIntType()});
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(aggregateAndBuild);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 0L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 2, 1000L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 3, 2500L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 10L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 2, 1000L));
        createTestHarness.processWatermark(new Watermark(12000L));
        createTestHarness.setProcessingTime(12000L);
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 6L, 3L, 0L)));
        concurrentLinkedQueue.add(new Watermark(12000L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        concurrentLinkedQueue.clear();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness2 = createTestHarness(aggregateAndBuild);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 2, 2500L));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 5L, 3L, 0L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 4, 5501L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 5, 6000L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 5, 6000L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 6, 6050L));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 14L, 3L, 1L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 3, 4000L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 10, 15000L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 20, 15000L));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 36L, 3L, 2L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 2, 2500L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 2, 2500L));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 7L, 3L, 1L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.close();
        Assert.assertEquals("Close was not called.", 2L, closeCalled.get());
    }

    @Test
    public void testSlidingCountWindow() throws Exception {
        closeCalled.set(0);
        WindowOperator aggregateAndBuild = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).countWindow(5L, 3L).aggregateAndBuild(getCountWindowAggFunction(), this.equaliser, this.accTypes, this.aggResultTypes, new LogicalType[]{new BigIntType()});
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(aggregateAndBuild);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 0L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 2, 1000L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 3, 2500L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 4, 2500L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 5, 2500L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 10L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 2, 1000L));
        createTestHarness.processWatermark(new Watermark(12000L));
        createTestHarness.setProcessingTime(12000L);
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 15L, 5L, 0L)));
        concurrentLinkedQueue.add(new Watermark(12000L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        concurrentLinkedQueue.clear();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness2 = createTestHarness(aggregateAndBuild);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 3, 2500L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 4, 2500L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 5, 2500L));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 15L, 5L, 0L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 6, 6000L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 7, 6000L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 8, 6050L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 9, 6050L));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 30L, 5L, 1L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 6, 4000L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 7, 4000L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 8, 4000L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 10, 15000L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 11, 15000L));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 30L, 5L, 1L)));
        concurrentLinkedQueue.addAll(doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 45L, 5L, 2L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.close();
        Assert.assertEquals("Close was not called.", 2L, closeCalled.get());
    }

    @Test
    public void testWindowCloseWithoutOpen() throws Exception {
        WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).countWindow(3L).aggregate(new GeneratedNamespaceTableAggsHandleFunction("MockClass", "MockCode", new Object[0]), this.accTypes, this.aggResultTypes, new LogicalType[]{new BigIntType()}).build().close();
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(WindowOperator windowOperator) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness(windowOperator, this.keySelector, this.keyType);
    }
}
