package org.apache.flink.streaming.runtime.operators.windowing;

import java.io.Serializable;
import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.DynamicEventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.DynamicProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.class */
public class WindowOperatorTest extends TestLogger {
    private static final TypeInformation<Tuple2<String, Integer>> STRING_INT_TUPLE = TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.1
    });
    private static AtomicInteger closeCalled = new AtomicInteger(0);
    private static final OutputTag<Tuple2<String, Integer>> lateOutputTag = new OutputTag<Tuple2<String, Integer>>("late-output") { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.2
    };

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$EventTimeTriggerAccumGC.class */
    private static class EventTimeTriggerAccumGC extends Trigger<Object, TimeWindow> {
        private static final long serialVersionUID = 1;
        private long cleanupTime;

        public EventTimeTriggerAccumGC(long j) {
            this.cleanupTime = j;
        }

        public TriggerResult onElement(Object obj, long j, TimeWindow timeWindow, Trigger.TriggerContext triggerContext) throws Exception {
            if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
                return TriggerResult.FIRE;
            }
            triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
            return TriggerResult.CONTINUE;
        }

        public TriggerResult onEventTime(long j, TimeWindow timeWindow, Trigger.TriggerContext triggerContext) {
            return (j == timeWindow.maxTimestamp() || j == timeWindow.maxTimestamp() + this.cleanupTime) ? TriggerResult.FIRE_AND_PURGE : TriggerResult.CONTINUE;
        }

        public TriggerResult onProcessingTime(long j, TimeWindow timeWindow, Trigger.TriggerContext triggerContext) throws Exception {
            return TriggerResult.CONTINUE;
        }

        public void clear(TimeWindow timeWindow, Trigger.TriggerContext triggerContext) throws Exception {
            triggerContext.deleteEventTimeTimer(timeWindow.maxTimestamp());
        }

        public boolean canMerge() {
            return true;
        }

        public void onMerge(TimeWindow timeWindow, Trigger.OnMergeContext onMergeContext) {
            onMergeContext.registerEventTimeTimer(timeWindow.maxTimestamp());
        }

        public String toString() {
            return "EventTimeTrigger()";
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$PassThroughFunction.class */
    private static class PassThroughFunction implements WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
        private static final long serialVersionUID = 1;

        private PassThroughFunction() {
        }

        public void apply(String str, TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
            Iterator<Tuple2<String, Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                collector.collect(it.next());
            }
        }

        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((String) obj, (TimeWindow) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple2<String, Integer>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$PassThroughFunction2.class */
    private static class PassThroughFunction2 implements WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {
        private static final long serialVersionUID = 1;

        private PassThroughFunction2() {
        }

        public void apply(String str, TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<String> collector) throws Exception {
            collector.collect("GOT: " + Joiner.on(",").join(iterable));
        }

        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((String) obj, (TimeWindow) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$PointSessionWindows.class */
    private static class PointSessionWindows extends EventTimeSessionWindows {
        private static final long serialVersionUID = 1;

        private PointSessionWindows(long j) {
            super(j);
        }

        public Collection<TimeWindow> assignWindows(Object obj, long j, WindowAssigner.WindowAssignerContext windowAssignerContext) {
            return ((obj instanceof Tuple2) && ((Integer) ((Tuple2) obj).f1).intValue() == 33) ? Collections.singletonList(new TimeWindow(j, j)) : Collections.singletonList(new TimeWindow(j, j + this.sessionTimeout));
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$ReducedProcessSessionWindowFunction.class */
    private static class ReducedProcessSessionWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
        private static final long serialVersionUID = 1;

        private ReducedProcessSessionWindowFunction() {
        }

        public void process(String str, ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple3<String, Long, Long>> collector) throws Exception {
            TimeWindow window = context.window();
            Iterator<Tuple2<String, Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                collector.collect(new Tuple3(str + "-" + it.next().f1, Long.valueOf(window.getStart()), Long.valueOf(window.getEnd())));
            }
        }

        public /* bridge */ /* synthetic */ void process(Object obj, ProcessWindowFunction.Context context, Iterable iterable, Collector collector) throws Exception {
            process((String) obj, (ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow>.Context) context, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple3<String, Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$ReducedSessionWindowFunction.class */
    private static class ReducedSessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
        private static final long serialVersionUID = 1;

        private ReducedSessionWindowFunction() {
        }

        public void apply(String str, TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple3<String, Long, Long>> collector) throws Exception {
            Iterator<Tuple2<String, Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                collector.collect(new Tuple3(str + "-" + it.next().f1, Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd())));
            }
        }

        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((String) obj, (TimeWindow) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple3<String, Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$RichSumReducer.class */
    private static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
        private static final long serialVersionUID = 1;
        private boolean openCalled;

        private RichSumReducer() {
            this.openCalled = false;
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            WindowOperatorTest.closeCalled.incrementAndGet();
        }

        public void apply(String str, W w, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
            if (!this.openCalled) {
                Assert.fail("Open was not called");
            }
            int i = 0;
            Iterator<Tuple2<String, Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                i += ((Integer) it.next().f1).intValue();
            }
            collector.collect(new Tuple2(str, Integer.valueOf(i)));
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((String) obj, (String) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple2<String, Integer>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$SessionProcessWindowFunction.class */
    private static class SessionProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
        private static final long serialVersionUID = 1;

        private SessionProcessWindowFunction() {
        }

        public void process(String str, ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple3<String, Long, Long>> collector) throws Exception {
            int i = 0;
            Iterator<Tuple2<String, Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                i += ((Integer) it.next().f1).intValue();
            }
            TimeWindow window = context.window();
            collector.collect(new Tuple3(str + "-" + i, Long.valueOf(window.getStart()), Long.valueOf(window.getEnd())));
        }

        public /* bridge */ /* synthetic */ void process(Object obj, ProcessWindowFunction.Context context, Iterable iterable, Collector collector) throws Exception {
            process((String) obj, (ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow>.Context) context, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple3<String, Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$SessionWindowFunction.class */
    private static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
        private static final long serialVersionUID = 1;

        private SessionWindowFunction() {
        }

        public void apply(String str, TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple3<String, Long, Long>> collector) throws Exception {
            int i = 0;
            Iterator<Tuple2<String, Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                i += ((Integer) it.next().f1).intValue();
            }
            collector.collect(new Tuple3(str + "-" + i, Long.valueOf(timeWindow.getStart()), Long.valueOf(timeWindow.getEnd())));
        }

        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((String) obj, (TimeWindow) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple3<String, Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$SumReducer.class */
    private static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1;

        private SumReducer() {
        }

        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) throws Exception {
            return new Tuple2<>(tuple22.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$Tuple2ResultSortComparator.class */
    public static class Tuple2ResultSortComparator implements Comparator<Object>, Serializable {
        private Tuple2ResultSortComparator() {
        }

        @Override // java.util.Comparator
        public int compare(Object obj, Object obj2) {
            if ((obj instanceof Watermark) || (obj2 instanceof Watermark)) {
                return 0;
            }
            StreamRecord streamRecord = (StreamRecord) obj;
            StreamRecord streamRecord2 = (StreamRecord) obj2;
            if (streamRecord.getTimestamp() != streamRecord2.getTimestamp()) {
                return (int) (streamRecord.getTimestamp() - streamRecord2.getTimestamp());
            }
            int compareTo = ((String) ((Tuple2) streamRecord.getValue()).f0).compareTo((String) ((Tuple2) streamRecord2.getValue()).f0);
            return compareTo != 0 ? compareTo : ((Integer) ((Tuple2) streamRecord.getValue()).f1).intValue() - ((Integer) ((Tuple2) streamRecord2.getValue()).f1).intValue();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$Tuple3ResultSortComparator.class */
    private static class Tuple3ResultSortComparator implements Comparator<Object>, Serializable {
        private Tuple3ResultSortComparator() {
        }

        @Override // java.util.Comparator
        public int compare(Object obj, Object obj2) {
            if ((obj instanceof Watermark) || (obj2 instanceof Watermark)) {
                return 0;
            }
            StreamRecord streamRecord = (StreamRecord) obj;
            StreamRecord streamRecord2 = (StreamRecord) obj2;
            if (streamRecord.getTimestamp() != streamRecord2.getTimestamp()) {
                return (int) (streamRecord.getTimestamp() - streamRecord2.getTimestamp());
            }
            int compareTo = ((String) ((Tuple3) streamRecord.getValue()).f0).compareTo((String) ((Tuple3) streamRecord2.getValue()).f0);
            if (compareTo != 0) {
                return compareTo;
            }
            int longValue = (int) (((Long) ((Tuple3) streamRecord.getValue()).f1).longValue() - ((Long) ((Tuple3) streamRecord2.getValue()).f1).longValue());
            return longValue != 0 ? longValue : (int) (((Long) ((Tuple3) streamRecord.getValue()).f2).longValue() - ((Long) ((Tuple3) streamRecord2.getValue()).f2).longValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest$TupleKeySelector.class */
    public static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
        private static final long serialVersionUID = 1;

        private TupleKeySelector() {
        }

        public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
            return (String) tuple2.f0;
        }
    }

    private void testSlidingEventTimeWindows(OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> oneInputStreamOperator) throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(oneInputStreamOperator);
        createTestHarness.setup();
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 3999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 3000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 20L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 0L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1998L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), 999L));
        concurrentLinkedQueue.add(new Watermark(999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), 1999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 3), 1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.processWatermark(new Watermark(2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), 2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 3), 2999L));
        concurrentLinkedQueue.add(new Watermark(2999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        concurrentLinkedQueue.clear();
        OneInputStreamOperatorTestHarness createTestHarness2 = createTestHarness(oneInputStreamOperator);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processWatermark(new Watermark(3999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 5), 3999L));
        concurrentLinkedQueue.add(new Watermark(3999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness2.processWatermark(new Watermark(4999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 4999L));
        concurrentLinkedQueue.add(new Watermark(4999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness2.processWatermark(new Watermark(5999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 5999L));
        concurrentLinkedQueue.add(new Watermark(5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness2.processWatermark(new Watermark(6999L));
        createTestHarness2.processWatermark(new Watermark(7999L));
        concurrentLinkedQueue.add(new Watermark(6999L));
        concurrentLinkedQueue.add(new Watermark(7999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness2.close();
    }

    @Test
    public void testSlidingEventTimeWindowsReduce() throws Exception {
        closeCalled.set(0);
        testSlidingEventTimeWindows(new WindowOperator(SlidingEventTimeWindows.of(Time.of(3L, TimeUnit.SECONDS), Time.of(1L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), EventTimeTrigger.create(), 0L, (OutputTag) null));
    }

    @Test
    public void testSlidingEventTimeWindowsApply() throws Exception {
        closeCalled.set(0);
        testSlidingEventTimeWindows(new WindowOperator(SlidingEventTimeWindows.of(Time.of(3L, TimeUnit.SECONDS), Time.of(1L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new RichSumReducer()), EventTimeTrigger.create(), 0L, (OutputTag) null));
        Assert.assertEquals("Close was not called.", 2L, closeCalled.get());
    }

    private void testTumblingEventTimeWindows(OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> oneInputStreamOperator) throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(oneInputStreamOperator);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 3999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 3000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 20L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 0L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1998L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.add(new Watermark(999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.close();
        OneInputStreamOperatorTestHarness createTestHarness2 = createTestHarness(oneInputStreamOperator);
        concurrentLinkedQueue.clear();
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processWatermark(new Watermark(2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), 2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 3), 2999L));
        concurrentLinkedQueue.add(new Watermark(2999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness2.processWatermark(new Watermark(3999L));
        concurrentLinkedQueue.add(new Watermark(3999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness2.processWatermark(new Watermark(4999L));
        concurrentLinkedQueue.add(new Watermark(4999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness2.processWatermark(new Watermark(5999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 5999L));
        concurrentLinkedQueue.add(new Watermark(5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness2.processWatermark(new Watermark(6999L));
        createTestHarness2.processWatermark(new Watermark(7999L));
        concurrentLinkedQueue.add(new Watermark(6999L));
        concurrentLinkedQueue.add(new Watermark(7999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness2.close();
    }

    @Test
    public void testTumblingEventTimeWindowsReduce() throws Exception {
        closeCalled.set(0);
        testTumblingEventTimeWindows(new WindowOperator(TumblingEventTimeWindows.of(Time.of(3L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), EventTimeTrigger.create(), 0L, (OutputTag) null));
    }

    @Test
    public void testTumblingEventTimeWindowsApply() throws Exception {
        closeCalled.set(0);
        testTumblingEventTimeWindows(new WindowOperator(TumblingEventTimeWindows.of(Time.of(3L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new RichSumReducer()), EventTimeTrigger.create(), 0L, (OutputTag) null));
        Assert.assertEquals("Close was not called.", 2L, closeCalled.get());
    }

    @Test
    public void testSessionWindows() throws Exception {
        closeCalled.set(0);
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new SessionWindowFunction()), EventTimeTrigger.create(), 0L, (OutputTag) null);
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(windowOperator);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 0L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 2), 1000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 3), 2500L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 10L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 2), 1000L));
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple3ResultSortComparator());
        createTestHarness.close();
        OneInputStreamOperatorTestHarness createTestHarness2 = createTestHarness(windowOperator);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key1", 3), 2500L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 4), 5501L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 5), 6000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 5), 6000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 6), 6050L));
        createTestHarness2.processWatermark(new Watermark(12000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key1-6", 10L, 5500L), 5499L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-6", 0L, 5500L), 5499L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-20", 5501L, 9050L), 9049L));
        concurrentLinkedQueue.add(new Watermark(12000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 10), 15000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 20), 15000L));
        createTestHarness2.processWatermark(new Watermark(17999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-30", 15000L, 18000L), 17999L));
        concurrentLinkedQueue.add(new Watermark(17999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput(), new Tuple3ResultSortComparator());
        createTestHarness2.close();
    }

    @Test
    public void testSessionWindowsWithProcessFunction() throws Exception {
        closeCalled.set(0);
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableProcessWindowFunction(new SessionProcessWindowFunction()), EventTimeTrigger.create(), 0L, (OutputTag) null);
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(windowOperator);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 0L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 2), 1000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 3), 2500L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 10L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 2), 1000L));
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple3ResultSortComparator());
        createTestHarness.close();
        OneInputStreamOperatorTestHarness createTestHarness2 = createTestHarness(windowOperator);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key1", 3), 2500L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 4), 5501L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 5), 6000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 5), 6000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 6), 6050L));
        createTestHarness2.processWatermark(new Watermark(12000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key1-6", 10L, 5500L), 5499L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-6", 0L, 5500L), 5499L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-20", 5501L, 9050L), 9049L));
        concurrentLinkedQueue.add(new Watermark(12000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 10), 15000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 20), 15000L));
        createTestHarness2.processWatermark(new Watermark(17999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-30", 15000L, 18000L), 17999L));
        concurrentLinkedQueue.add(new Watermark(17999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput(), new Tuple3ResultSortComparator());
        createTestHarness2.close();
    }

    @Test
    public void testReduceSessionWindows() throws Exception {
        closeCalled.set(0);
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new ReducedSessionWindowFunction()), EventTimeTrigger.create(), 0L, (OutputTag) null);
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(windowOperator);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 0L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 2), 1000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 3), 2500L));
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        OneInputStreamOperatorTestHarness createTestHarness2 = createTestHarness(windowOperator);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key1", 1), 10L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key1", 2), 1000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key1", 3), 2500L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 4), 5501L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 5), 6000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 5), 6000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 6), 6050L));
        createTestHarness2.processWatermark(new Watermark(12000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key1-6", 10L, 5500L), 5499L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-6", 0L, 5500L), 5499L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-20", 5501L, 9050L), 9049L));
        concurrentLinkedQueue.add(new Watermark(12000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 10), 15000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 20), 15000L));
        createTestHarness2.processWatermark(new Watermark(17999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-30", 15000L, 18000L), 17999L));
        concurrentLinkedQueue.add(new Watermark(17999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput(), new Tuple3ResultSortComparator());
        createTestHarness2.close();
    }

    @Test
    public void testReduceSessionWindowsWithProcessFunction() throws Exception {
        closeCalled.set(0);
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueProcessWindowFunction(new ReducedProcessSessionWindowFunction()), EventTimeTrigger.create(), 0L, (OutputTag) null);
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(windowOperator);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 0L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 2), 1000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 3), 2500L));
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        OneInputStreamOperatorTestHarness createTestHarness2 = createTestHarness(windowOperator);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key1", 1), 10L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key1", 2), 1000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key1", 3), 2500L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 4), 5501L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 5), 6000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 5), 6000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 6), 6050L));
        createTestHarness2.processWatermark(new Watermark(12000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key1-6", 10L, 5500L), 5499L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-6", 0L, 5500L), 5499L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-20", 5501L, 9050L), 9049L));
        concurrentLinkedQueue.add(new Watermark(12000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 10), 15000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 20), 15000L));
        createTestHarness2.processWatermark(new Watermark(17999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-30", 15000L, 18000L), 17999L));
        concurrentLinkedQueue.add(new Watermark(17999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput(), new Tuple3ResultSortComparator());
        createTestHarness2.close();
    }

    @Test
    public void testSessionWindowsWithCountTrigger() throws Exception {
        closeCalled.set(0);
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new SessionWindowFunction()), PurgingTrigger.of(CountTrigger.of(4L)), 0L, (OutputTag) null);
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(windowOperator);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 0L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 2), 1000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 3), 2500L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 4), 3500L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 10L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 2), 1000L));
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-10", 0L, 6500L), 6499L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple3ResultSortComparator());
        concurrentLinkedQueue.clear();
        OneInputStreamOperatorTestHarness createTestHarness2 = createTestHarness(windowOperator);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key1", 3), 2500L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key1", 1), 6000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key1", 2), 6500L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key1", 3), 7000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput(), new Tuple3ResultSortComparator());
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key1", 10), 4500L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key1-22", 10L, 10000L), 9999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput(), new Tuple3ResultSortComparator());
        createTestHarness2.close();
    }

    @Test
    public void testSessionWindowsWithContinuousEventTimeTrigger() throws Exception {
        closeCalled.set(0);
        WindowOperator windowOperator = new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new SessionWindowFunction()), ContinuousEventTimeTrigger.of(Time.seconds(2L)), 0L, (OutputTag) null);
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(windowOperator);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 1500L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 0L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 3), 2500L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 2), 1000L));
        createTestHarness.processWatermark(new Watermark(2500L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key1-1", 1500L, 4500L), 4499L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-6", 0L, 5500L), 5499L));
        concurrentLinkedQueue.add(new Watermark(2500L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 5), 4000L));
        createTestHarness.processWatermark(new Watermark(3000L));
        concurrentLinkedQueue.add(new Watermark(3000L));
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple3ResultSortComparator());
        createTestHarness.close();
        concurrentLinkedQueue.clear();
        OneInputStreamOperatorTestHarness createTestHarness2 = createTestHarness(windowOperator);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key1", 2), 4000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 4), 3500L));
        createTestHarness2.processWatermark(new Watermark(4000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key1-3", 1500L, 7000L), 6999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-15", 0L, 7000L), 6999L));
        concurrentLinkedQueue.add(new Watermark(4000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput(), new Tuple3ResultSortComparator());
        createTestHarness2.close();
    }

    @Test
    public void testPointSessions() throws Exception {
        closeCalled.set(0);
        WindowOperator windowOperator = new WindowOperator(new PointSessionWindows(3000L), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new SessionWindowFunction()), EventTimeTrigger.create(), 0L, (OutputTag) null);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(windowOperator);
        Throwable th = null;
        try {
            try {
                createTestHarness.open();
                createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 0L));
                createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 33), 1000L));
                OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
                if (createTestHarness != null) {
                    if (0 != 0) {
                        try {
                            createTestHarness.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createTestHarness.close();
                    }
                }
                OneInputStreamOperatorTestHarness createTestHarness2 = createTestHarness(windowOperator);
                Throwable th3 = null;
                try {
                    createTestHarness2.setup();
                    createTestHarness2.initializeState(snapshot);
                    createTestHarness2.open();
                    createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 33), 2500L));
                    createTestHarness2.processElement(new StreamRecord(new Tuple2("key1", 1), 10L));
                    createTestHarness2.processElement(new StreamRecord(new Tuple2("key1", 2), 1000L));
                    createTestHarness2.processElement(new StreamRecord(new Tuple2("key1", 33), 2500L));
                    createTestHarness2.processWatermark(new Watermark(12000L));
                    concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key1-36", 10L, 4000L), 3999L));
                    concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-67", 0L, 3000L), 2999L));
                    concurrentLinkedQueue.add(new Watermark(12000L));
                    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput(), new Tuple3ResultSortComparator());
                    if (createTestHarness2 != null) {
                        if (0 == 0) {
                            createTestHarness2.close();
                            return;
                        }
                        try {
                            createTestHarness2.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    if (createTestHarness2 != null) {
                        if (0 != 0) {
                            try {
                                createTestHarness2.close();
                            } catch (Throwable th6) {
                                th3.addSuppressed(th6);
                            }
                        } else {
                            createTestHarness2.close();
                        }
                    }
                    throw th5;
                }
            } catch (Throwable th7) {
                th = th7;
                throw th7;
            }
        } catch (Throwable th8) {
            if (createTestHarness != null) {
                if (th != null) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th8;
        }
    }

    private static <OUT> OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, OUT> createTestHarness(OneInputStreamOperator<Tuple2<String, Integer>, OUT> oneInputStreamOperator) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator) oneInputStreamOperator, (KeySelector) new TupleKeySelector(), (TypeInformation) BasicTypeInfo.STRING_TYPE_INFO);
    }

    @Test
    public void testContinuousWatermarkTrigger() throws Exception {
        closeCalled.set(0);
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(GlobalWindows.create(), new GlobalWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), ContinuousEventTimeTrigger.of(Time.of(3L, TimeUnit.SECONDS)), 0L, (OutputTag) null));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 0L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 3000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 3999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 20L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1998L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness.processWatermark(new Watermark(1000L));
        concurrentLinkedQueue.add(new Watermark(1000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.processWatermark(new Watermark(2000L));
        concurrentLinkedQueue.add(new Watermark(2000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.processWatermark(new Watermark(3000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), Long.MAX_VALUE));
        concurrentLinkedQueue.add(new Watermark(3000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.processWatermark(new Watermark(4000L));
        concurrentLinkedQueue.add(new Watermark(4000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.processWatermark(new Watermark(5000L));
        concurrentLinkedQueue.add(new Watermark(5000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.processWatermark(new Watermark(6000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), Long.MAX_VALUE));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 5), Long.MAX_VALUE));
        concurrentLinkedQueue.add(new Watermark(6000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.processWatermark(new Watermark(7000L));
        createTestHarness.processWatermark(new Watermark(8000L));
        concurrentLinkedQueue.add(new Watermark(7000L));
        concurrentLinkedQueue.add(new Watermark(8000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.close();
    }

    @Test
    public void testCountTrigger() throws Exception {
        closeCalled.set(0);
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(GlobalWindows.create(), new GlobalWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), PurgingTrigger.of(CountTrigger.of(4L)), 0L, (OutputTag) null));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 3000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 3999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 20L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 0L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1998L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1999L));
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        ConcurrentLinkedQueue<Object> output = createTestHarness.getOutput();
        OneInputStreamOperatorTestHarness createTestHarness2 = createTestHarness(new WindowOperator(GlobalWindows.create(), new GlobalWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), PurgingTrigger.of(CountTrigger.of(4L)), 0L, (OutputTag) null));
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 4), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, Iterables.concat(output, createTestHarness2.getOutput()), new Tuple2ResultSortComparator());
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key1", 1), 10999L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness2.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 4), Long.MAX_VALUE));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 4), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, Iterables.concat(output, createTestHarness2.getOutput()), new Tuple2ResultSortComparator());
        createTestHarness2.close();
    }

    @Test
    public void testProcessingTimeTumblingWindows() throws Throwable {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(TumblingProcessingTimeWindows.of(Time.of(3L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), ProcessingTimeTrigger.create(), 0L, (OutputTag) null));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.setProcessingTime(3L);
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), Long.MAX_VALUE));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 7000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 7000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 7000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 7000L));
        createTestHarness.setProcessingTime(5000L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 3), 2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 2), 2999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 7000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 7000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 7000L));
        createTestHarness.setProcessingTime(7000L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), 5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.close();
    }

    @Test
    public void testProcessingTimeSlidingWindows() throws Throwable {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(SlidingProcessingTimeWindows.of(Time.of(3L, TimeUnit.SECONDS), Time.of(1L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), ProcessingTimeTrigger.create(), 0L, (OutputTag) null));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.setProcessingTime(3L);
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), Long.MAX_VALUE));
        createTestHarness.setProcessingTime(1000L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), Long.MAX_VALUE));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), Long.MAX_VALUE));
        createTestHarness.setProcessingTime(2000L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 3), 1999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), Long.MAX_VALUE));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), Long.MAX_VALUE));
        createTestHarness.setProcessingTime(3000L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 3), 2999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 2), 2999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), Long.MAX_VALUE));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), Long.MAX_VALUE));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), Long.MAX_VALUE));
        createTestHarness.setProcessingTime(7000L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 3999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 5), 3999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 5), 4999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), 5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.close();
    }

    @Test
    public void testProcessingTimeSessionWindows() throws Throwable {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(ProcessingTimeSessionWindows.withGap(Time.of(3L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), ProcessingTimeTrigger.create(), 0L, (OutputTag) null));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.setProcessingTime(3L);
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1L));
        createTestHarness.setProcessingTime(1000L);
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1002L));
        createTestHarness.setProcessingTime(5000L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 3999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 5000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 5000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 5000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 5000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 5000L));
        createTestHarness.setProcessingTime(10000L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 7999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 3), 7999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        Assert.assertEquals(concurrentLinkedQueue.size(), createTestHarness.getOutput().size());
        Iterator<Object> it = createTestHarness.getOutput().iterator();
        while (it.hasNext()) {
            Object next = it.next();
            if (next instanceof StreamRecord) {
                Assert.assertTrue(concurrentLinkedQueue.contains((StreamRecord) next));
            }
        }
        createTestHarness.close();
    }

    @Test
    public void testDynamicEventTimeSessionWindows() throws Exception {
        closeCalled.set(0);
        SessionWindowTimeGapExtractor sessionWindowTimeGapExtractor = (SessionWindowTimeGapExtractor) Mockito.mock(SessionWindowTimeGapExtractor.class);
        Mockito.when(Long.valueOf(sessionWindowTimeGapExtractor.extract(Matchers.any(Tuple2.class)))).thenAnswer(invocationOnMock -> {
            Tuple2 tuple2 = (Tuple2) invocationOnMock.getArguments()[0];
            String str = (String) tuple2.f0;
            boolean z = -1;
            switch (str.hashCode()) {
                case 3288498:
                    if (str.equals("key1")) {
                        z = false;
                        break;
                    }
                    break;
                case 3288499:
                    if (str.equals("key2")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return 3000L;
                case true:
                    switch (((Integer) tuple2.f1).intValue()) {
                        case 10:
                            return 1000L;
                        default:
                            return 2000L;
                    }
                default:
                    return 0L;
            }
        });
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(DynamicEventTimeSessionWindows.withDynamicGap(sessionWindowTimeGapExtractor), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new SessionWindowFunction()), EventTimeTrigger.create(), 0L, (OutputTag) null));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 3), 10L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 4), 5000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 5), 6000L));
        createTestHarness.processWatermark(new Watermark(8999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key1-3", 10L, 3010L), 3009L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-9", 5000L, 8000L), 7999L));
        concurrentLinkedQueue.add(new Watermark(8999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 9000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 2), 10000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 10), 10500L));
        createTestHarness.processWatermark(new Watermark(12999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-13", 9000L, 12000L), 11999L));
        concurrentLinkedQueue.add(new Watermark(12999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 10), 13000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 10), 13500L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 14000L));
        createTestHarness.processWatermark(new Watermark(16999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-21", 13000L, 16000L), 15999L));
        concurrentLinkedQueue.add(new Watermark(16999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple3ResultSortComparator());
        createTestHarness.close();
    }

    @Test
    public void testDynamicProcessingTimeSessionWindows() throws Exception {
        closeCalled.set(0);
        SessionWindowTimeGapExtractor sessionWindowTimeGapExtractor = (SessionWindowTimeGapExtractor) Mockito.mock(SessionWindowTimeGapExtractor.class);
        Mockito.when(Long.valueOf(sessionWindowTimeGapExtractor.extract(Matchers.any(Tuple2.class)))).thenAnswer(invocationOnMock -> {
            Tuple2 tuple2 = (Tuple2) invocationOnMock.getArguments()[0];
            String str = (String) tuple2.f0;
            boolean z = -1;
            switch (str.hashCode()) {
                case 3288498:
                    if (str.equals("key1")) {
                        z = false;
                        break;
                    }
                    break;
                case 3288499:
                    if (str.equals("key2")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return 3000L;
                case true:
                    switch (((Integer) tuple2.f1).intValue()) {
                        case 10:
                            return 1000L;
                        default:
                            return 2000L;
                    }
                default:
                    return 0L;
            }
        });
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(DynamicProcessingTimeSessionWindows.withDynamicGap(sessionWindowTimeGapExtractor), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new SessionWindowFunction()), ProcessingTimeTrigger.create(), 0L, (OutputTag) null));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.setProcessingTime(10L);
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 3), 10L));
        createTestHarness.setProcessingTime(5000L);
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 4), 5000L));
        createTestHarness.setProcessingTime(6000L);
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 5), 6000L));
        createTestHarness.setProcessingTime(8999L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key1-3", 10L, 3010L), 3009L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-9", 5000L, 8000L), 7999L));
        createTestHarness.setProcessingTime(9000L);
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 9000L));
        createTestHarness.setProcessingTime(10000L);
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 2), 10000L));
        createTestHarness.setProcessingTime(10500L);
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 10), 10500L));
        createTestHarness.setProcessingTime(10500L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-13", 9000L, 12000L), 11999L));
        createTestHarness.setProcessingTime(13000L);
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 10), 13000L));
        createTestHarness.setProcessingTime(13500L);
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 10), 13500L));
        createTestHarness.setProcessingTime(14000L);
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 14000L));
        createTestHarness.setProcessingTime(16999L);
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-21", 13000L, 16000L), 15999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple3ResultSortComparator());
        createTestHarness.close();
    }

    @Test
    public void testLateness() throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(TumblingEventTimeWindows.of(Time.of(2L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), PurgingTrigger.of(EventTimeTrigger.create()), 500L, lateOutputTag));
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue concurrentLinkedQueue2 = new ConcurrentLinkedQueue();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 500L));
        createTestHarness.processWatermark(new Watermark(1500L));
        concurrentLinkedQueue.add(new Watermark(1500L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1300L));
        createTestHarness.processWatermark(new Watermark(2300L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 1999L));
        concurrentLinkedQueue.add(new Watermark(2300L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1997L));
        createTestHarness.processWatermark(new Watermark(6000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 1999L));
        concurrentLinkedQueue.add(new Watermark(6000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1998L));
        createTestHarness.processWatermark(new Watermark(7000L));
        concurrentLinkedQueue2.add(new StreamRecord(new Tuple2("key2", 1), 1998L));
        concurrentLinkedQueue.add(new Watermark(7000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", concurrentLinkedQueue2, createTestHarness.getSideOutput(lateOutputTag), new Tuple2ResultSortComparator());
        createTestHarness.close();
    }

    @Test
    public void testCleanupTimeOverflow() throws Exception {
        ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        TumblingEventTimeWindows of = TumblingEventTimeWindows.of(Time.milliseconds(1000L));
        final WindowOperator windowOperator = new WindowOperator(of, new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), reducingStateDescriptor, new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), EventTimeTrigger.create(), 2000L, (OutputTag) null);
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(windowOperator);
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        TimeWindow timeWindow = (TimeWindow) Iterables.getOnlyElement(of.assignWindows(new Tuple2("key2", 1), 9223372036854774057L, new WindowAssigner.WindowAssignerContext() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.3
            public long getCurrentProcessingTime() {
                return windowOperator.windowAssignerContext.getCurrentProcessingTime();
            }
        }));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 9223372036854774057L));
        Assert.assertTrue(timeWindow.maxTimestamp() + 2000 < timeWindow.maxTimestamp());
        Assert.assertTrue(timeWindow.maxTimestamp() + 2000 < 9223372036854774307L);
        createTestHarness.processWatermark(new Watermark(9223372036854774307L));
        Assert.assertTrue(9223372036854774307L < timeWindow.maxTimestamp());
        Assert.assertTrue(timeWindow.maxTimestamp() < Long.MAX_VALUE);
        createTestHarness.processWatermark(new Watermark(timeWindow.maxTimestamp()));
        concurrentLinkedQueue.add(new Watermark(9223372036854774307L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), timeWindow.maxTimestamp()));
        concurrentLinkedQueue.add(new Watermark(timeWindow.maxTimestamp()));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.close();
    }

    @Test
    public void testSideOutputDueToLatenessTumbling() throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(TumblingEventTimeWindows.of(Time.of(2L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), EventTimeTrigger.create(), 0L, lateOutputTag));
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue concurrentLinkedQueue2 = new ConcurrentLinkedQueue();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness.processWatermark(new Watermark(1985L));
        concurrentLinkedQueue.add(new Watermark(1985L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1980L));
        createTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1998L));
        concurrentLinkedQueue2.add(new StreamRecord(new Tuple2("key2", 1), 1998L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2001L));
        createTestHarness.processWatermark(new Watermark(2999L));
        concurrentLinkedQueue.add(new Watermark(2999L));
        createTestHarness.processWatermark(new Watermark(3999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 3999L));
        concurrentLinkedQueue.add(new Watermark(3999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", concurrentLinkedQueue2, createTestHarness.getSideOutput(lateOutputTag), new Tuple2ResultSortComparator());
        createTestHarness.close();
    }

    @Test
    public void testSideOutputDueToLatenessSliding() throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(SlidingEventTimeWindows.of(Time.of(3L, TimeUnit.SECONDS), Time.of(1L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), EventTimeTrigger.create(), 0L, lateOutputTag));
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue concurrentLinkedQueue2 = new ConcurrentLinkedQueue();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2000L));
        createTestHarness.processWatermark(new Watermark(3000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 2), 2999L));
        concurrentLinkedQueue.add(new Watermark(3000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 3001L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2400L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2400L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 3001L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 3900L));
        createTestHarness.processWatermark(new Watermark(6000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 5), 3999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 2), 3999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 4), 4999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 2), 4999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 5999L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key1", 2), 5999L));
        concurrentLinkedQueue.add(new Watermark(6000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key1", 1), 3001L));
        concurrentLinkedQueue2.add(new StreamRecord(new Tuple2("key1", 1), 3001L));
        createTestHarness.processWatermark(new Watermark(25000L));
        concurrentLinkedQueue.add(new Watermark(25000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", concurrentLinkedQueue2, createTestHarness.getSideOutput(lateOutputTag), new Tuple2ResultSortComparator());
        createTestHarness.close();
    }

    @Test
    public void testSideOutputDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new ReducedSessionWindowFunction()), PurgingTrigger.of(EventTimeTrigger.create()), 0L, lateOutputTag));
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue concurrentLinkedQueue2 = new ConcurrentLinkedQueue();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2000L));
        createTestHarness.processWatermark(new Watermark(4998L));
        concurrentLinkedQueue.add(new Watermark(4998L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 4500L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 8500L));
        createTestHarness.processWatermark(new Watermark(7400L));
        concurrentLinkedQueue.add(new Watermark(7400L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 7000L));
        createTestHarness.processWatermark(new Watermark(11501L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-5", 1000L, 11500L), 11499L));
        concurrentLinkedQueue.add(new Watermark(11501L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 11600L));
        createTestHarness.processWatermark(new Watermark(14600L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 11600L, 14600L), 14599L));
        concurrentLinkedQueue.add(new Watermark(14600L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 10000L));
        concurrentLinkedQueue2.add(new StreamRecord(new Tuple2("key2", 1), 10000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 10100L));
        concurrentLinkedQueue2.add(new StreamRecord(new Tuple2("key2", 1), 10100L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 14500L));
        createTestHarness.processWatermark(new Watermark(20000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 14500L, 17500L), 17499L));
        concurrentLinkedQueue.add(new Watermark(20000L));
        createTestHarness.processWatermark(new Watermark(100000L));
        concurrentLinkedQueue.add(new Watermark(100000L));
        ConcurrentLinkedQueue<Object> output = createTestHarness.getOutput();
        AbstractCollection sideOutput = createTestHarness.getSideOutput(lateOutputTag);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, output, new Tuple2ResultSortComparator());
        TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", concurrentLinkedQueue2, sideOutput, new Tuple2ResultSortComparator());
        createTestHarness.close();
    }

    @Test
    public void testSideOutputDueToLatenessSessionZeroLateness() throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new ReducedSessionWindowFunction()), EventTimeTrigger.create(), 0L, lateOutputTag));
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        ConcurrentLinkedQueue concurrentLinkedQueue2 = new ConcurrentLinkedQueue();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2000L));
        createTestHarness.processWatermark(new Watermark(4998L));
        concurrentLinkedQueue.add(new Watermark(4998L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 4500L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 8500L));
        createTestHarness.processWatermark(new Watermark(7400L));
        concurrentLinkedQueue.add(new Watermark(7400L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 7000L));
        createTestHarness.processWatermark(new Watermark(11501L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-5", 1000L, 11500L), 11499L));
        concurrentLinkedQueue.add(new Watermark(11501L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 11600L));
        createTestHarness.processWatermark(new Watermark(14600L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 11600L, 14600L), 14599L));
        concurrentLinkedQueue.add(new Watermark(14600L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 10000L));
        concurrentLinkedQueue2.add(new StreamRecord(new Tuple2("key2", 1), 10000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 14500L));
        createTestHarness.processWatermark(new Watermark(20000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 14500L, 17500L), 17499L));
        concurrentLinkedQueue.add(new Watermark(20000L));
        createTestHarness.processWatermark(new Watermark(100000L));
        concurrentLinkedQueue.add(new Watermark(100000L));
        ConcurrentLinkedQueue<Object> output = createTestHarness.getOutput();
        AbstractCollection sideOutput = createTestHarness.getSideOutput(lateOutputTag);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, output, new Tuple2ResultSortComparator());
        TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", concurrentLinkedQueue2, sideOutput, new Tuple2ResultSortComparator());
        createTestHarness.close();
    }

    @Test
    public void testDropDueToLatenessSessionWithLatenessPurgingTrigger() throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new ReducedSessionWindowFunction()), PurgingTrigger.of(EventTimeTrigger.create()), 10L, lateOutputTag));
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2000L));
        createTestHarness.processWatermark(new Watermark(4998L));
        concurrentLinkedQueue.add(new Watermark(4998L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 4500L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 8500L));
        createTestHarness.processWatermark(new Watermark(7400L));
        concurrentLinkedQueue.add(new Watermark(7400L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 7000L));
        createTestHarness.processWatermark(new Watermark(11501L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-5", 1000L, 11500L), 11499L));
        concurrentLinkedQueue.add(new Watermark(11501L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 11600L));
        createTestHarness.processWatermark(new Watermark(14600L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 11600L, 14600L), 14599L));
        concurrentLinkedQueue.add(new Watermark(14600L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 10000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 10000L, 14600L), 14599L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 14500L));
        createTestHarness.processWatermark(new Watermark(20000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 10000L, 17500L), 17499L));
        concurrentLinkedQueue.add(new Watermark(20000L));
        createTestHarness.processWatermark(new Watermark(100000L));
        concurrentLinkedQueue.add(new Watermark(100000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple3ResultSortComparator());
        createTestHarness.close();
    }

    @Test
    public void testNotSideOutputDueToLatenessSessionWithLateness() throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new ReducedSessionWindowFunction()), EventTimeTrigger.create(), 10L, lateOutputTag));
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2000L));
        createTestHarness.processWatermark(new Watermark(4998L));
        concurrentLinkedQueue.add(new Watermark(4998L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 4500L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 8500L));
        createTestHarness.processWatermark(new Watermark(7400L));
        concurrentLinkedQueue.add(new Watermark(7400L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 7000L));
        createTestHarness.processWatermark(new Watermark(11501L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-5", 1000L, 11500L), 11499L));
        concurrentLinkedQueue.add(new Watermark(11501L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 11600L));
        createTestHarness.processWatermark(new Watermark(14600L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 11600L, 14600L), 14599L));
        concurrentLinkedQueue.add(new Watermark(14600L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 10000L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 14500L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-2", 10000L, 14600L), 14599L));
        ConcurrentLinkedQueue<Object> output = createTestHarness.getOutput();
        AbstractCollection sideOutput = createTestHarness.getSideOutput(lateOutputTag);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, output, new Tuple3ResultSortComparator());
        Assert.assertEquals((Object) null, sideOutput);
        createTestHarness.processWatermark(new Watermark(20000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-3", 10000L, 17500L), 17499L));
        concurrentLinkedQueue.add(new Watermark(20000L));
        createTestHarness.processWatermark(new Watermark(100000L));
        concurrentLinkedQueue.add(new Watermark(100000L));
        ConcurrentLinkedQueue<Object> output2 = createTestHarness.getOutput();
        AbstractCollection sideOutput2 = createTestHarness.getSideOutput(lateOutputTag);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, output2, new Tuple3ResultSortComparator());
        Assert.assertEquals((Object) null, sideOutput2);
        createTestHarness.close();
    }

    @Test
    public void testNotSideOutputDueToLatenessSessionWithHugeLatenessPurgingTrigger() throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new ReducedSessionWindowFunction()), PurgingTrigger.of(EventTimeTrigger.create()), 10000L, lateOutputTag));
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2000L));
        createTestHarness.processWatermark(new Watermark(4998L));
        concurrentLinkedQueue.add(new Watermark(4998L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 4500L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 8500L));
        createTestHarness.processWatermark(new Watermark(7400L));
        concurrentLinkedQueue.add(new Watermark(7400L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 7000L));
        createTestHarness.processWatermark(new Watermark(11501L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-5", 1000L, 11500L), 11499L));
        concurrentLinkedQueue.add(new Watermark(11501L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 11600L));
        createTestHarness.processWatermark(new Watermark(14600L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 11600L, 14600L), 14599L));
        concurrentLinkedQueue.add(new Watermark(14600L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 10000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 1000L, 14600L), 14599L));
        ConcurrentLinkedQueue<Object> output = createTestHarness.getOutput();
        AbstractCollection sideOutput = createTestHarness.getSideOutput(lateOutputTag);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, output, new Tuple3ResultSortComparator());
        Assert.assertEquals((Object) null, sideOutput);
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 14500L));
        createTestHarness.processWatermark(new Watermark(20000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 1000L, 17500L), 17499L));
        concurrentLinkedQueue.add(new Watermark(20000L));
        createTestHarness.processWatermark(new Watermark(100000L));
        concurrentLinkedQueue.add(new Watermark(100000L));
        ConcurrentLinkedQueue<Object> output2 = createTestHarness.getOutput();
        AbstractCollection sideOutput2 = createTestHarness.getSideOutput(lateOutputTag);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, output2, new Tuple3ResultSortComparator());
        Assert.assertEquals((Object) null, sideOutput2);
        createTestHarness.close();
    }

    @Test
    public void testNotSideOutputDueToLatenessSessionWithHugeLateness() throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new ReducedSessionWindowFunction()), EventTimeTrigger.create(), 10000L, lateOutputTag));
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 2000L));
        createTestHarness.processWatermark(new Watermark(4998L));
        concurrentLinkedQueue.add(new Watermark(4998L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 4500L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 8500L));
        createTestHarness.processWatermark(new Watermark(7400L));
        concurrentLinkedQueue.add(new Watermark(7400L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 7000L));
        createTestHarness.processWatermark(new Watermark(11501L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-5", 1000L, 11500L), 11499L));
        concurrentLinkedQueue.add(new Watermark(11501L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 11600L));
        createTestHarness.processWatermark(new Watermark(14600L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 11600L, 14600L), 14599L));
        concurrentLinkedQueue.add(new Watermark(14600L));
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 10000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-7", 1000L, 14600L), 14599L));
        ConcurrentLinkedQueue<Object> output = createTestHarness.getOutput();
        AbstractCollection sideOutput = createTestHarness.getSideOutput(lateOutputTag);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, output, new Tuple3ResultSortComparator());
        Assert.assertEquals((Object) null, sideOutput);
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 14500L));
        createTestHarness.processWatermark(new Watermark(20000L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-8", 1000L, 17500L), 17499L));
        concurrentLinkedQueue.add(new Watermark(20000L));
        createTestHarness.processWatermark(new Watermark(100000L));
        concurrentLinkedQueue.add(new Watermark(100000L));
        ConcurrentLinkedQueue<Object> output2 = createTestHarness.getOutput();
        AbstractCollection sideOutput2 = createTestHarness.getSideOutput(lateOutputTag);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, output2, new Tuple3ResultSortComparator());
        Assert.assertEquals((Object) null, sideOutput2);
        createTestHarness.close();
    }

    @Test
    public void testCleanupTimerWithEmptyListStateForTumblingWindows2() throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(TumblingEventTimeWindows.of(Time.of(2L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new PassThroughFunction2()), new EventTimeTriggerAccumGC(100L), 100L, (OutputTag) null));
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness.processWatermark(new Watermark(1599L));
        createTestHarness.processWatermark(new Watermark(1999L));
        createTestHarness.processWatermark(new Watermark(2100L));
        createTestHarness.processWatermark(new Watermark(5000L));
        concurrentLinkedQueue.add(new Watermark(1599L));
        concurrentLinkedQueue.add(new StreamRecord("GOT: (key2,1)", 1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(2100L));
        concurrentLinkedQueue.add(new Watermark(5000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.close();
    }

    @Test
    public void testCleanupTimerWithEmptyListStateForTumblingWindows() throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(TumblingEventTimeWindows.of(Time.of(2L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new PassThroughFunction()), EventTimeTrigger.create(), 1L, (OutputTag) null));
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness.processWatermark(new Watermark(1599L));
        createTestHarness.processWatermark(new Watermark(1999L));
        createTestHarness.processWatermark(new Watermark(2000L));
        createTestHarness.processWatermark(new Watermark(5000L));
        concurrentLinkedQueue.add(new Watermark(1599L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(2000L));
        concurrentLinkedQueue.add(new Watermark(5000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.close();
    }

    @Test
    public void testCleanupTimerWithEmptyReduceStateForTumblingWindows() throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(TumblingEventTimeWindows.of(Time.of(2L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new PassThroughWindowFunction()), EventTimeTrigger.create(), 1L, (OutputTag) null));
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness.processWatermark(new Watermark(1599L));
        createTestHarness.processWatermark(new Watermark(1999L));
        createTestHarness.processWatermark(new Watermark(2000L));
        createTestHarness.processWatermark(new Watermark(5000L));
        concurrentLinkedQueue.add(new Watermark(1599L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(2000L));
        concurrentLinkedQueue.add(new Watermark(5000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.close();
    }

    @Test
    public void testCleanupTimerWithEmptyFoldingStateForTumblingWindows() throws Exception {
        FoldingStateDescriptor foldingStateDescriptor = new FoldingStateDescriptor("window-contents", new Tuple2((String) null, 0), new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.4
            private static final long serialVersionUID = 1;

            public Tuple2<String, Integer> fold(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) throws Exception {
                return new Tuple2<>(tuple22.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
            }
        }, STRING_INT_TUPLE);
        foldingStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(TumblingEventTimeWindows.of(Time.of(2L, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), foldingStateDescriptor, new InternalSingleValueWindowFunction(new PassThroughFunction()), EventTimeTrigger.create(), 1L, (OutputTag) null));
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness.processWatermark(new Watermark(1599L));
        createTestHarness.processWatermark(new Watermark(1999L));
        createTestHarness.processWatermark(new Watermark(2000L));
        createTestHarness.processWatermark(new Watermark(5000L));
        concurrentLinkedQueue.add(new Watermark(1599L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(2000L));
        concurrentLinkedQueue.add(new Watermark(5000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.close();
    }

    @Test
    public void testCleanupTimerWithEmptyListStateForSessionWindows() throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new PassThroughFunction()), EventTimeTrigger.create(), 10L, (OutputTag) null));
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness.processWatermark(new Watermark(4998L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 3999L));
        concurrentLinkedQueue.add(new Watermark(4998L));
        createTestHarness.processWatermark(new Watermark(14600L));
        concurrentLinkedQueue.add(new Watermark(14600L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.close();
    }

    @Test
    public void testCleanupTimerWithEmptyReduceStateForSessionWindows() throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig())), new InternalSingleValueWindowFunction(new ReducedSessionWindowFunction()), EventTimeTrigger.create(), 10L, (OutputTag) null));
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness.processWatermark(new Watermark(4998L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple3("key2-1", 1000L, 4000L), 3999L));
        concurrentLinkedQueue.add(new Watermark(4998L));
        createTestHarness.processWatermark(new Watermark(14600L));
        concurrentLinkedQueue.add(new Watermark(14600L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.close();
    }

    @Test
    public void testCleanupTimerWithEmptyFoldingStateForSessionWindows() throws Exception {
        FoldingStateDescriptor foldingStateDescriptor = new FoldingStateDescriptor("window-contents", new Tuple2((String) null, 0), new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.5
            private static final long serialVersionUID = 1;

            public Tuple2<String, Integer> fold(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) throws Exception {
                return new Tuple2<>(tuple22.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
            }
        }, STRING_INT_TUPLE);
        foldingStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new WindowOperator(EventTimeSessionWindows.withGap(Time.seconds(3L)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), foldingStateDescriptor, new InternalSingleValueWindowFunction(new PassThroughFunction()), EventTimeTrigger.create(), 10L, (OutputTag) null));
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(new StreamRecord(new Tuple2("key2", 1), 1000L));
        createTestHarness.processWatermark(new Watermark(4998L));
        concurrentLinkedQueue.add(new StreamRecord(new Tuple2("key2", 1), 3999L));
        concurrentLinkedQueue.add(new Watermark(4998L));
        createTestHarness.processWatermark(new Watermark(14600L));
        concurrentLinkedQueue.add(new Watermark(14600L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput(), new Tuple2ResultSortComparator());
        createTestHarness.close();
    }
}
