/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.Comparator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.evictors.DeltaEvictor;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class EvictingWindowOperatorTest {
    private static final TypeInformation<Tuple2<String, Integer>> STRING_INT_TUPLE = TypeInformation.of((TypeHint)new TypeHint<Tuple2<String, Integer>>(){});

    @Test
    public void testCountEvictorEvictAfter() throws Exception {
        AtomicInteger closeCalled = new AtomicInteger(0);
        int windowSize = 4;
        int triggerCount = 2;
        boolean evictAfter = true;
        StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", (TypeSerializer)streamRecordSerializer);
        EvictingWindowOperator operator = new EvictingWindowOperator((WindowAssigner)GlobalWindows.create(), (TypeSerializer)new GlobalWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction(new RichSumReducer(closeCalled)), (Trigger)CountTrigger.of((long)2L), (Evictor)CountEvictor.of((long)4L, (boolean)true), 0L, null);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 3000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 3999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 20L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1998L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 10999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)4), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)6), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)6), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
        testHarness.close();
        Assert.assertEquals((String)"Close was not called.", (long)1L, (long)closeCalled.get());
    }

    @Test
    public void testTimeEvictorEvictAfter() throws Exception {
        AtomicInteger closeCalled = new AtomicInteger(0);
        int triggerCount = 2;
        boolean evictAfter = true;
        StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", (TypeSerializer)streamRecordSerializer);
        EvictingWindowOperator operator = new EvictingWindowOperator((WindowAssigner)GlobalWindows.create(), (TypeSerializer)new GlobalWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction(new RichSumReducer(closeCalled)), (Trigger)CountTrigger.of((long)2L), (Evictor)TimeEvictor.of((Time)Time.seconds((long)2L), (boolean)true), 0L, null);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 4000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 20L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 3500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 2001L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1001L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 10999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1002L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)4), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
        testHarness.close();
        Assert.assertEquals((String)"Close was not called.", (long)1L, (long)closeCalled.get());
    }

    @Test
    public void testTimeEvictorEvictBefore() throws Exception {
        AtomicInteger closeCalled = new AtomicInteger(0);
        int triggerCount = 2;
        int windowSize = 4;
        StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", (TypeSerializer)streamRecordSerializer);
        EvictingWindowOperator operator = new EvictingWindowOperator((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)4L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction(new RichSumReducer(closeCalled)), (Trigger)CountTrigger.of((long)2L), (Evictor)TimeEvictor.of((Time)Time.seconds((long)2L)), 0L, null);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 3999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 20L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 5999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 3500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 2001L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1001L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 3999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 3999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 6500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1002L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 7999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 3999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
        testHarness.close();
        Assert.assertEquals((String)"Close was not called.", (long)1L, (long)closeCalled.get());
    }

    @Test
    public void testTimeEvictorNoTimestamp() throws Exception {
        AtomicInteger closeCalled = new AtomicInteger(0);
        int triggerCount = 2;
        boolean evictAfter = true;
        StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", (TypeSerializer)streamRecordSerializer);
        EvictingWindowOperator operator = new EvictingWindowOperator((WindowAssigner)GlobalWindows.create(), (TypeSerializer)new GlobalWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction(new RichSumReducer(closeCalled)), (Trigger)CountTrigger.of((long)2L), (Evictor)TimeEvictor.of((Time)Time.seconds((long)2L), (boolean)true), 0L, null);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1)));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1)));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)4), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)6), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
        testHarness.close();
        Assert.assertEquals((String)"Close was not called.", (long)1L, (long)closeCalled.get());
    }

    @Test
    public void testDeltaEvictorEvictBefore() throws Exception {
        AtomicInteger closeCalled = new AtomicInteger(0);
        int triggerCount = 2;
        boolean evictAfter = false;
        int threshold = 2;
        StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", (TypeSerializer)streamRecordSerializer);
        EvictingWindowOperator operator = new EvictingWindowOperator((WindowAssigner)GlobalWindows.create(), (TypeSerializer)new GlobalWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction(new RichSumReducer(closeCalled)), (Trigger)CountTrigger.of((long)2L), (Evictor)DeltaEvictor.of((double)2.0, (DeltaFunction)new DeltaFunction<Tuple2<String, Integer>>(){

            public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) {
                return (Integer)newDataPoint.f1 - (Integer)oldDataPoint.f1;
            }
        }, (boolean)false), 0L, null);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 3000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), initialTime + 3999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 20L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)5), initialTime + 999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), initialTime + 1998L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)6), initialTime + 1999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)11), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), initialTime + 10999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)10), initialTime + 1000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)8), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)10), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
        testHarness.close();
        Assert.assertEquals((String)"Close was not called.", (long)1L, (long)closeCalled.get());
    }

    @Test
    public void testDeltaEvictorEvictAfter() throws Exception {
        AtomicInteger closeCalled = new AtomicInteger(0);
        int triggerCount = 2;
        boolean evictAfter = true;
        int threshold = 2;
        StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", (TypeSerializer)streamRecordSerializer);
        EvictingWindowOperator operator = new EvictingWindowOperator((WindowAssigner)GlobalWindows.create(), (TypeSerializer)new GlobalWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction(new RichSumReducer(closeCalled)), (Trigger)CountTrigger.of((long)2L), (Evictor)DeltaEvictor.of((double)2.0, (DeltaFunction)new DeltaFunction<Tuple2<String, Integer>>(){

            public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, Integer> newDataPoint) {
                return (Integer)newDataPoint.f1 - (Integer)oldDataPoint.f1;
            }
        }, (boolean)true), 0L, null);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 3000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), initialTime + 3999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 20L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)5), initialTime + 999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), initialTime + 1998L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)6), initialTime + 1999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)15), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)9), initialTime + 10999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)10), initialTime + 1000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)16), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)22), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
        testHarness.close();
        Assert.assertEquals((String)"Close was not called.", (long)1L, (long)closeCalled.get());
    }

    @Test
    public void testCountTrigger() throws Exception {
        int windowSize = 4;
        int windowSlide = 2;
        StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", (TypeSerializer)streamRecordSerializer);
        EvictingWindowOperator operator = new EvictingWindowOperator((WindowAssigner)GlobalWindows.create(), (TypeSerializer)new GlobalWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new ReduceApplyWindowFunction((ReduceFunction)new SumReducer(), (WindowFunction)new PassThroughWindowFunction())), (Trigger)CountTrigger.of((long)2L), (Evictor)CountEvictor.of((long)4L), 0L, null);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 3000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 3999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 20L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1998L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 10999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)4), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
        testHarness.close();
    }

    @Test
    public void testCountTriggerWithApply() throws Exception {
        AtomicInteger closeCalled = new AtomicInteger(0);
        int windowSize = 4;
        int windowSlide = 2;
        StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", (TypeSerializer)streamRecordSerializer);
        EvictingWindowOperator operator = new EvictingWindowOperator((WindowAssigner)GlobalWindows.create(), (TypeSerializer)new GlobalWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction(new RichSumReducer(closeCalled)), (Trigger)CountTrigger.of((long)2L), (Evictor)CountEvictor.of((long)4L), 0L, null);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 3000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 3999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 20L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1998L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 10999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 1000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)4), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
        testHarness.close();
        Assert.assertEquals((String)"Close was not called.", (long)1L, (long)closeCalled.get());
    }

    @Test
    public void testTumblingWindowWithApply() throws Exception {
        AtomicInteger closeCalled = new AtomicInteger(0);
        int windowSize = 4;
        StreamElementSerializer streamRecordSerializer = new StreamElementSerializer(STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", (TypeSerializer)streamRecordSerializer);
        EvictingWindowOperator operator = new EvictingWindowOperator((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)4L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction(new RichSumReducer(closeCalled)), (Trigger)EventTimeTrigger.create(), (Evictor)CountEvictor.of((long)4L), 0L, null);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new TupleKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
        long initialTime = 0L;
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 10L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 100L));
        testHarness.processWatermark(new Watermark(1999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 1997L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 1998L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 2310L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), initialTime + 2310L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 2310L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), initialTime + 2310L));
        testHarness.processWatermark(new Watermark(3999L));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new Watermark(1999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)4), 3999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 3999L));
        expectedOutput.add(new Watermark(3999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
        testHarness.close();
    }

    private static class TupleKeySelector
    implements KeySelector<Tuple2<String, Integer>, String> {
        private static final long serialVersionUID = 1L;

        private TupleKeySelector() {
        }

        public String getKey(Tuple2<String, Integer> value) throws Exception {
            return (String)value.f0;
        }
    }

    private static class ResultSortComparator
    implements Comparator<Object> {
        private ResultSortComparator() {
        }

        @Override
        public int compare(Object o1, Object o2) {
            if (o1 instanceof Watermark || o2 instanceof Watermark) {
                return 0;
            }
            StreamRecord sr0 = (StreamRecord)o1;
            StreamRecord sr1 = (StreamRecord)o2;
            if (sr0.getTimestamp() != sr1.getTimestamp()) {
                return (int)(sr0.getTimestamp() - sr1.getTimestamp());
            }
            int comparison = ((String)((Tuple2)sr0.getValue()).f0).compareTo((String)((Tuple2)sr1.getValue()).f0);
            if (comparison != 0) {
                return comparison;
            }
            return (Integer)((Tuple2)sr0.getValue()).f1 - (Integer)((Tuple2)sr1.getValue()).f1;
        }
    }

    private static class RichSumReducer<W extends Window>
    extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
        private static final long serialVersionUID = 1L;
        private boolean openCalled = false;
        private AtomicInteger closeCalled = new AtomicInteger(0);

        public RichSumReducer(AtomicInteger closeCalled) {
            this.closeCalled = closeCalled;
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            this.closeCalled.incrementAndGet();
        }

        public void apply(String key, W window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
            if (!this.openCalled) {
                Assert.fail((String)"Open was not called");
            }
            int sum = 0;
            for (Tuple2<String, Integer> t : input) {
                sum += ((Integer)t.f1).intValue();
            }
            out.collect((Object)new Tuple2((Object)key, (Object)sum));
        }
    }

    private static class SumReducer
    implements ReduceFunction<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        private SumReducer() {
        }

        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
            return new Tuple2(value2.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1));
        }
    }
}

