/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.StreamRecordMatchers;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.hamcrest.MockitoHamcrest;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public abstract class WindowOperatorContractTest
extends TestLogger {
    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    private static ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor("string-state", (TypeSerializer)StringSerializer.INSTANCE, null);

    static <IN, OUT, KEY, W extends Window> InternalWindowFunction<IN, OUT, KEY, W> mockWindowFunction() throws Exception {
        InternalWindowFunction mockWindowFunction = (InternalWindowFunction)Mockito.mock(InternalWindowFunction.class);
        return mockWindowFunction;
    }

    static <T, W extends Window> Trigger<T, W> mockTrigger() throws Exception {
        Trigger mockTrigger = (Trigger)Mockito.mock(Trigger.class);
        Mockito.when((Object)mockTrigger.onElement(Matchers.any(), Matchers.anyLong(), (Window)Matchers.any(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.CONTINUE);
        Mockito.when((Object)mockTrigger.onEventTime(Matchers.anyLong(), (Window)Matchers.any(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.CONTINUE);
        Mockito.when((Object)mockTrigger.onProcessingTime(Matchers.anyLong(), (Window)Matchers.any(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.CONTINUE);
        return mockTrigger;
    }

    static <T> WindowAssigner<T, TimeWindow> mockTimeWindowAssigner() throws Exception {
        WindowAssigner mockAssigner = (WindowAssigner)Mockito.mock(WindowAssigner.class);
        Mockito.when((Object)mockAssigner.getWindowSerializer((ExecutionConfig)Mockito.any())).thenReturn((Object)new TimeWindow.Serializer());
        Mockito.when((Object)mockAssigner.isEventTime()).thenReturn((Object)true);
        return mockAssigner;
    }

    static <T> WindowAssigner<T, GlobalWindow> mockGlobalWindowAssigner() throws Exception {
        WindowAssigner mockAssigner = (WindowAssigner)Mockito.mock(WindowAssigner.class);
        Mockito.when((Object)mockAssigner.getWindowSerializer((ExecutionConfig)Mockito.any())).thenReturn((Object)new GlobalWindow.Serializer());
        Mockito.when((Object)mockAssigner.isEventTime()).thenReturn((Object)true);
        Mockito.when((Object)mockAssigner.assignWindows(Mockito.any(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Collections.singletonList(GlobalWindow.get()));
        return mockAssigner;
    }

    static <T> MergingWindowAssigner<T, TimeWindow> mockMergingAssigner() throws Exception {
        MergingWindowAssigner mockAssigner = (MergingWindowAssigner)Mockito.mock(MergingWindowAssigner.class);
        Mockito.when((Object)mockAssigner.getWindowSerializer((ExecutionConfig)Mockito.any())).thenReturn((Object)new TimeWindow.Serializer());
        Mockito.when((Object)mockAssigner.isEventTime()).thenReturn((Object)true);
        return mockAssigner;
    }

    static WindowAssigner.WindowAssignerContext anyAssignerContext() {
        return (WindowAssigner.WindowAssignerContext)Mockito.any();
    }

    static Trigger.TriggerContext anyTriggerContext() {
        return (Trigger.TriggerContext)Mockito.any();
    }

    static <T> Collector<T> anyCollector() {
        return (Collector)Mockito.any();
    }

    static Iterable<Integer> anyIntIterable() {
        return (Iterable)Mockito.any();
    }

    static Iterable<Integer> intIterable(Integer ... values) {
        return (Iterable)MockitoHamcrest.argThat((Matcher)org.hamcrest.Matchers.contains((Object[])values));
    }

    static TimeWindow anyTimeWindow() {
        return (TimeWindow)Mockito.any();
    }

    static InternalWindowFunction.InternalWindowContext anyInternalWindowContext() {
        return (InternalWindowFunction.InternalWindowContext)Mockito.any();
    }

    static Trigger.OnMergeContext anyOnMergeContext() {
        return (Trigger.OnMergeContext)Mockito.any();
    }

    static MergingWindowAssigner.MergeCallback anyMergeCallback() {
        return (MergingWindowAssigner.MergeCallback)Mockito.any();
    }

    static <T> void shouldRegisterEventTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, final long timestamp) throws Exception {
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                context.registerEventTimeTimer(timestamp);
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
    }

    private static <T> void shouldDeleteEventTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, final long timestamp) throws Exception {
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                context.deleteEventTimeTimer(timestamp);
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
    }

    private static <T> void shouldRegisterProcessingTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, final long timestamp) throws Exception {
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                context.registerProcessingTimeTimer(timestamp);
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
    }

    private static <T> void shouldDeleteProcessingTimeTimerOnElement(Trigger<T, TimeWindow> mockTrigger, final long timestamp) throws Exception {
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                context.deleteProcessingTimeTimer(timestamp);
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
    }

    private static <T, W extends Window> void shouldMergeWindows(MergingWindowAssigner<T, W> assigner, final Collection<? extends W> expectedWindows, final Collection<W> toMerge, final W mergeResult) {
        ((MergingWindowAssigner)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Exception {
                Collection windows = (Collection)invocation.getArguments()[0];
                MergingWindowAssigner.MergeCallback callback = (MergingWindowAssigner.MergeCallback)invocation.getArguments()[1];
                Assert.assertThat((Object)windows, (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])expectedWindows.toArray()));
                callback.merge(toMerge, (Object)mergeResult);
                return null;
            }
        }).when(assigner)).mergeWindows(Matchers.anyCollection(), (MergingWindowAssigner.MergeCallback)Matchers.anyObject());
    }

    private static <T> void shouldContinueOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
        Mockito.when((Object)mockTrigger.onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.CONTINUE);
    }

    private static <T> void shouldFireOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
        Mockito.when((Object)mockTrigger.onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.FIRE);
    }

    private static <T> void shouldPurgeOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
        Mockito.when((Object)mockTrigger.onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.PURGE);
    }

    private static <T> void shouldFireAndPurgeOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
        Mockito.when((Object)mockTrigger.onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.FIRE_AND_PURGE);
    }

    private static <T> void shouldContinueOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
        Mockito.when((Object)mockTrigger.onEventTime(Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.CONTINUE);
    }

    private static <T> void shouldFireOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
        Mockito.when((Object)mockTrigger.onEventTime(Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.FIRE);
    }

    private static <T> void shouldPurgeOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
        Mockito.when((Object)mockTrigger.onEventTime(Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.PURGE);
    }

    private static <T> void shouldFireAndPurgeOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
        Mockito.when((Object)mockTrigger.onEventTime(Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.FIRE_AND_PURGE);
    }

    private static <T> void shouldContinueOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
        Mockito.when((Object)mockTrigger.onProcessingTime(Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.CONTINUE);
    }

    private static <T> void shouldFireOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
        Mockito.when((Object)mockTrigger.onProcessingTime(Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.FIRE);
    }

    private static <T> void shouldPurgeOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
        Mockito.when((Object)mockTrigger.onProcessingTime(Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.PURGE);
    }

    private static <T> void shouldFireAndPurgeOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
        Mockito.when((Object)mockTrigger.onProcessingTime(Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.FIRE_AND_PURGE);
    }

    @Test
    public void testNoLateSideOutputForSkippedWindows() throws Exception {
        OutputTag<Integer> lateOutputTag = new OutputTag<Integer>("late"){};
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction, lateOutputTag);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Collections.emptyList());
        testHarness.processWatermark(0L);
        testHarness.processElement(new StreamRecord((Object)0, 5L));
        ((WindowAssigner)Mockito.verify(mockAssigner, (VerificationMode)Mockito.times((int)1))).assignWindows((Object)Matchers.eq((int)0), Matchers.eq((long)5L), WindowOperatorContractTest.anyAssignerContext());
        Assert.assertTrue((testHarness.getSideOutput(lateOutputTag) == null || testHarness.getSideOutput(lateOutputTag).isEmpty() ? 1 : 0) != 0);
    }

    @Test
    public void testLateSideOutput() throws Exception {
        OutputTag<Integer> lateOutputTag = new OutputTag<Integer>("late"){};
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction, lateOutputTag);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Collections.singletonList(new TimeWindow(0L, 0L)));
        testHarness.processWatermark(20L);
        testHarness.processElement(new StreamRecord((Object)0, 5L));
        ((WindowAssigner)Mockito.verify(mockAssigner, (VerificationMode)Mockito.times((int)1))).assignWindows((Object)Matchers.eq((int)0), Matchers.eq((long)5L), WindowOperatorContractTest.anyAssignerContext());
        Assert.assertThat(testHarness.getSideOutput(lateOutputTag), (Matcher)org.hamcrest.Matchers.contains(StreamRecordMatchers.streamRecord(0, 5L)));
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Collections.emptyList());
        testHarness.processElement(new StreamRecord((Object)0, 10L));
        ((WindowAssigner)Mockito.verify(mockAssigner, (VerificationMode)Mockito.times((int)1))).assignWindows((Object)Matchers.eq((int)0), Matchers.eq((long)5L), WindowOperatorContractTest.anyAssignerContext());
        ((WindowAssigner)Mockito.verify(mockAssigner, (VerificationMode)Mockito.times((int)1))).assignWindows((Object)Matchers.eq((int)0), Matchers.eq((long)10L), WindowOperatorContractTest.anyAssignerContext());
        Assert.assertThat(testHarness.getSideOutput(lateOutputTag), (Matcher)org.hamcrest.Matchers.contains((Matcher[])new Matcher[]{StreamRecordMatchers.streamRecord(0, 5L), StreamRecordMatchers.streamRecord(0, 10L)}));
    }

    @Test
    public void testSideOutput() throws Exception {
        OutputTag<Integer> integerOutputTag = new OutputTag<Integer>("int-out"){};
        OutputTag<Long> longOutputTag = new OutputTag<Long>("long-out"){};
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> windowFunction = new InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow>((OutputTag)integerOutputTag, (OutputTag)longOutputTag){
            final /* synthetic */ OutputTag val$integerOutputTag;
            final /* synthetic */ OutputTag val$longOutputTag;
            {
                this.val$integerOutputTag = outputTag;
                this.val$longOutputTag = outputTag2;
            }

            public void process(Integer integer, TimeWindow window, InternalWindowFunction.InternalWindowContext ctx, Iterable<Integer> input, Collector<Void> out) throws Exception {
                Integer inputValue = input.iterator().next();
                ctx.output(this.val$integerOutputTag, (Object)inputValue);
                ctx.output(this.val$longOutputTag, (Object)inputValue.longValue());
            }

            public void clear(TimeWindow window, InternalWindowFunction.InternalWindowContext context) throws Exception {
            }
        };
        KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, windowFunction);
        testHarness.open();
        long windowEnd = 42L;
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Collections.singletonList(new TimeWindow(0L, 42L)));
        WindowOperatorContractTest.shouldFireOnElement(mockTrigger);
        testHarness.processElement(new StreamRecord((Object)17, 5L));
        Assert.assertThat(testHarness.getSideOutput(integerOutputTag), (Matcher)org.hamcrest.Matchers.contains(StreamRecordMatchers.streamRecord(17, 41L)));
        Assert.assertThat(testHarness.getSideOutput(longOutputTag), (Matcher)org.hamcrest.Matchers.contains(StreamRecordMatchers.streamRecord(17L, 41L)));
    }

    @Test
    public void testAssignerIsInvokedOncePerElement() throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Collections.singletonList(new TimeWindow(0L, 0L)));
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        ((WindowAssigner)Mockito.verify(mockAssigner, (VerificationMode)Mockito.times((int)1))).assignWindows((Object)Matchers.eq((int)0), Matchers.eq((long)0L), WindowOperatorContractTest.anyAssignerContext());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        ((WindowAssigner)Mockito.verify(mockAssigner, (VerificationMode)Mockito.times((int)2))).assignWindows((Object)Matchers.eq((int)0), Matchers.eq((long)0L), WindowOperatorContractTest.anyAssignerContext());
    }

    @Test
    public void testAssignerWithMultipleWindows() throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        WindowOperatorContractTest.shouldFireOnElement(mockTrigger);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)2))).process((Object)Matchers.eq((int)0), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.anyIntIterable(), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(2L, 4L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0), WindowOperatorContractTest.anyCollector());
    }

    @Test
    public void testWindowsDontInterfere() throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Collections.singletonList(new TimeWindow(0L, 2L)));
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Collections.singletonList(new TimeWindow(0L, 1L)));
        testHarness.processElement(new StreamRecord((Object)1, 0L));
        Assert.assertTrue((boolean)testHarness.extractOutputStreamRecords().isEmpty());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)testHarness.numEventTimeTimers());
        WindowOperatorContractTest.shouldFireOnElement(mockTrigger);
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Collections.singletonList(new TimeWindow(0L, 1L)));
        testHarness.processElement(new StreamRecord((Object)1, 0L));
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Collections.singletonList(new TimeWindow(0L, 2L)));
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)2))).process((Object)Matchers.anyInt(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.anyIntIterable(), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0, 0), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)1), (Window)Matchers.eq((Object)new TimeWindow(0L, 1L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(1, 1), WindowOperatorContractTest.anyCollector());
    }

    @Test
    public void testOnElementCalledPerWindow() throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        testHarness.processElement(new StreamRecord((Object)42, 1L));
        ((Trigger)Mockito.verify(mockTrigger)).onElement((Object)Matchers.eq((int)42), Matchers.eq((long)1L), (Window)Matchers.eq((Object)new TimeWindow(2L, 4L)), WindowOperatorContractTest.anyTriggerContext());
        ((Trigger)Mockito.verify(mockTrigger)).onElement((Object)Matchers.eq((int)42), Matchers.eq((long)1L), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), WindowOperatorContractTest.anyTriggerContext());
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.times((int)2))).onElement((Object)Matchers.anyInt(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
    }

    @Test
    public void testEmittingFromWindowFunction() throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Collections.singletonList(new TimeWindow(0L, 2L)));
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                return TriggerResult.FIRE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        ((InternalWindowFunction)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Exception {
                Collector out = (Collector)invocation.getArgument(4);
                out.collect((Object)"Hallo");
                out.collect((Object)"Ciao");
                return null;
            }
        }).when(mockWindowFunction)).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0), WindowOperatorContractTest.anyCollector());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0), WindowOperatorContractTest.anyCollector());
        Assert.assertThat(testHarness.extractOutputStreamRecords(), (Matcher)org.hamcrest.Matchers.contains((Matcher[])new Matcher[]{StreamRecordMatchers.streamRecord("Hallo", 1L), StreamRecordMatchers.streamRecord("Ciao", 1L)}));
    }

    @Test
    public void testEmittingFromWindowFunctionOnEventTime() throws Exception {
        this.testEmittingFromWindowFunction(new EventTimeAdaptor());
    }

    @Test
    public void testEmittingFromWindowFunctionOnProcessingTime() throws Exception {
        this.testEmittingFromWindowFunction(new ProcessingTimeAdaptor());
    }

    private void testEmittingFromWindowFunction(TimeDomainAdaptor timeAdaptor) throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Collections.singletonList(new TimeWindow(0L, 2L)));
        ((InternalWindowFunction)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Exception {
                Collector out = (Collector)invocation.getArgument(4);
                out.collect((Object)"Hallo");
                out.collect((Object)"Ciao");
                return null;
            }
        }).when(mockWindowFunction)).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0), WindowOperatorContractTest.anyCollector());
        timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 1L);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.never())).process((Object)Matchers.anyInt(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.anyIntIterable(), WindowOperatorContractTest.anyCollector());
        Assert.assertTrue((boolean)testHarness.extractOutputStreamRecords().isEmpty());
        timeAdaptor.shouldFireOnTime(mockTrigger);
        timeAdaptor.advanceTime(testHarness, 1L);
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0), WindowOperatorContractTest.anyCollector());
        Assert.assertThat(testHarness.extractOutputStreamRecords(), (Matcher)org.hamcrest.Matchers.contains((Matcher[])new Matcher[]{StreamRecordMatchers.streamRecord("Hallo", 1L), StreamRecordMatchers.streamRecord("Ciao", 1L)}));
    }

    @Test
    public void testOnElementContinue() throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                TimeWindow window = (TimeWindow)invocation.getArguments()[2];
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                context.registerEventTimeTimer(window.getEnd());
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).update((Object)"hello");
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.never())).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
    }

    @Test
    public void testOnElementFire() throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                TimeWindow window = (TimeWindow)invocation.getArguments()[2];
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                context.registerEventTimeTimer(window.getEnd());
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).update((Object)"hello");
                return TriggerResult.FIRE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)2))).process((Object)Matchers.eq((int)0), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.anyIntIterable(), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(2L, 4L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0), WindowOperatorContractTest.anyCollector());
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.never())).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)testHarness.numEventTimeTimers());
    }

    @Test
    public void testOnElementFireAndPurge() throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                TimeWindow window = (TimeWindow)invocation.getArguments()[2];
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                context.registerEventTimeTimer(window.getEnd());
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).update((Object)"hello");
                return TriggerResult.FIRE_AND_PURGE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)2))).process((Object)Matchers.eq((int)0), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.anyIntIterable(), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(2L, 4L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0), WindowOperatorContractTest.anyCollector());
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.never())).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)testHarness.numEventTimeTimers());
    }

    @Test
    public void testOnElementPurge() throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                context.registerEventTimeTimer(0L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).update((Object)"hello");
                return TriggerResult.PURGE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.never())).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
    }

    @Test
    public void testOnEventTimeContinue() throws Exception {
        this.testOnTimeContinue(new EventTimeAdaptor());
    }

    @Test
    public void testOnProcessingTimeContinue() throws Exception {
        this.testOnTimeContinue(new ProcessingTimeAdaptor());
    }

    private void testOnTimeContinue(final TimeDomainAdaptor timeAdaptor) throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        timeAdaptor.setIsEventTime(mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        Assert.assertEquals((long)0L, (long)testHarness.extractOutputStreamRecords().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                timeAdaptor.registerTimer(context, 0L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).update((Object)"hello");
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        timeAdaptor.shouldContinueOnTime(mockTrigger);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.advanceTime(testHarness, 0L);
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)timeAdaptor.numTimers(testHarness));
        Assert.assertEquals((long)0L, (long)testHarness.extractOutputStreamRecords().size());
    }

    @Test
    public void testOnEventTimeFire() throws Exception {
        this.testOnTimeFire(new EventTimeAdaptor());
    }

    @Test
    public void testOnProcessingTimeFire() throws Exception {
        this.testOnTimeFire(new ProcessingTimeAdaptor());
    }

    private void testOnTimeFire(final TimeDomainAdaptor timeAdaptor) throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        timeAdaptor.setIsEventTime(mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        Assert.assertEquals((long)0L, (long)testHarness.extractOutputStreamRecords().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                timeAdaptor.registerTimer(context, 0L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).update((Object)"hello");
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        timeAdaptor.shouldFireOnTime(mockTrigger);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.advanceTime(testHarness, 0L);
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)2))).process((Object)Matchers.eq((int)0), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.anyIntIterable(), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(2L, 4L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0), WindowOperatorContractTest.anyCollector());
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.never())).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)timeAdaptor.numTimers(testHarness));
    }

    @Test
    public void testOnEventTimeFireAndPurge() throws Exception {
        this.testOnTimeFireAndPurge(new EventTimeAdaptor());
    }

    @Test
    public void testOnProcessingTimeFireAndPurge() throws Exception {
        this.testOnTimeFireAndPurge(new ProcessingTimeAdaptor());
    }

    private void testOnTimeFireAndPurge(final TimeDomainAdaptor timeAdaptor) throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        timeAdaptor.setIsEventTime(mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        Assert.assertEquals((long)0L, (long)testHarness.extractOutputStreamRecords().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                timeAdaptor.registerTimer(context, 0L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).update((Object)"hello");
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        timeAdaptor.shouldFireAndPurgeOnTime(mockTrigger);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.advanceTime(testHarness, 0L);
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)2))).process((Object)Matchers.eq((int)0), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.anyIntIterable(), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(2L, 4L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0), WindowOperatorContractTest.anyCollector());
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.never())).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)timeAdaptor.numTimers(testHarness));
    }

    @Test
    public void testOnEventTimePurge() throws Exception {
        this.testOnTimePurge(new EventTimeAdaptor());
    }

    @Test
    public void testOnProcessingTimePurge() throws Exception {
        this.testOnTimePurge(new ProcessingTimeAdaptor());
    }

    private void testOnTimePurge(final TimeDomainAdaptor timeAdaptor) throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        timeAdaptor.setIsEventTime(mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(4L, 6L)));
        Assert.assertEquals((long)0L, (long)testHarness.extractOutputStreamRecords().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                timeAdaptor.registerTimer(context, 1L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).update((Object)"hello");
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        timeAdaptor.shouldPurgeOnTime(mockTrigger);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.advanceTime(testHarness, 1L);
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.never())).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)timeAdaptor.numTimers(testHarness));
        Assert.assertEquals((long)0L, (long)testHarness.extractOutputStreamRecords().size());
    }

    @Test
    public void testNoEventTimeFiringForPurgedWindow() throws Exception {
        this.testNoTimerFiringForPurgedWindow(new EventTimeAdaptor());
    }

    @Test
    public void testNoProcessingTimeFiringForPurgedWindow() throws Exception {
        this.testNoTimerFiringForPurgedWindow(new ProcessingTimeAdaptor());
    }

    private void testNoTimerFiringForPurgedWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        timeAdaptor.setIsEventTime(mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = (InternalWindowFunction)Mockito.mock(InternalWindowFunction.class);
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L)));
        Assert.assertEquals((long)0L, (long)testHarness.extractOutputStreamRecords().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                timeAdaptor.registerTimer(context, 0L);
                return TriggerResult.PURGE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.advanceTime(testHarness, 0L);
        timeAdaptor.verifyTriggerCallback(mockTrigger, Mockito.times((int)1), null, null);
        ((InternalWindowFunction)Mockito.verify((Object)mockWindowFunction, (VerificationMode)Mockito.never())).process((Object)Matchers.anyInt(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.anyIntIterable(), WindowOperatorContractTest.anyCollector());
        Assert.assertEquals((long)1L, (long)timeAdaptor.numTimers(testHarness));
    }

    @Test
    public void testNoEventTimeFiringForPurgedMergingWindow() throws Exception {
        this.testNoTimerFiringForPurgedMergingWindow(new EventTimeAdaptor());
    }

    @Test
    public void testNoProcessingTimeFiringForPurgedMergingWindow() throws Exception {
        this.testNoTimerFiringForPurgedMergingWindow(new ProcessingTimeAdaptor());
    }

    private void testNoTimerFiringForPurgedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
        MergingWindowAssigner mockAssigner = WindowOperatorContractTest.mockMergingAssigner();
        timeAdaptor.setIsEventTime((WindowAssigner<?, ?>)mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = (InternalWindowFunction)Mockito.mock(InternalWindowFunction.class);
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, (Trigger)mockTrigger, 0L, (InternalWindowFunction)mockWindowFunction);
        testHarness.open();
        timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L)));
        Assert.assertEquals((long)0L, (long)testHarness.extractOutputStreamRecords().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                timeAdaptor.registerTimer(context, 0L);
                return TriggerResult.PURGE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)1L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.advanceTime(testHarness, 0L);
        timeAdaptor.verifyTriggerCallback(mockTrigger, Mockito.times((int)1), null, null);
        ((InternalWindowFunction)Mockito.verify((Object)mockWindowFunction, (VerificationMode)Mockito.never())).process((Object)Matchers.anyInt(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.anyIntIterable(), WindowOperatorContractTest.anyCollector());
        Assert.assertEquals((long)1L, (long)timeAdaptor.numTimers(testHarness));
    }

    @Test
    public void testNoEventTimeFiringForGarbageCollectedMergingWindow() throws Exception {
        this.testNoTimerFiringForGarbageCollectedMergingWindow(new EventTimeAdaptor());
    }

    @Test
    public void testNoProcessingTimeFiringForGarbageCollectedMergingWindow() throws Exception {
        this.testNoTimerFiringForGarbageCollectedMergingWindow(new ProcessingTimeAdaptor());
    }

    private void testNoTimerFiringForGarbageCollectedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
        MergingWindowAssigner mockAssigner = WindowOperatorContractTest.mockMergingAssigner();
        timeAdaptor.setIsEventTime((WindowAssigner<?, ?>)mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = (InternalWindowFunction)Mockito.mock(InternalWindowFunction.class);
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, (Trigger)mockTrigger, 0L, (InternalWindowFunction)mockWindowFunction);
        testHarness.open();
        timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L)));
        Assert.assertEquals((long)0L, (long)testHarness.extractOutputStreamRecords().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                timeAdaptor.registerTimer(context, 10L);
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.shouldContinueOnTime(mockTrigger);
        timeAdaptor.advanceTime(testHarness, 4L);
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.times((int)1))).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)1L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.verifyTriggerCallback(mockTrigger, Mockito.times((int)1), null, null);
        ((InternalWindowFunction)Mockito.verify((Object)mockWindowFunction, (VerificationMode)Mockito.never())).process((Object)Matchers.anyInt(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.anyIntIterable(), WindowOperatorContractTest.anyCollector());
        timeAdaptor.advanceTime(testHarness, 10L);
        timeAdaptor.verifyTriggerCallback(mockTrigger, Mockito.times((int)1), null, null);
    }

    @Test
    public void testEventTimeTimerCreationAndDeletion() throws Exception {
        this.testTimerCreationAndDeletion(new EventTimeAdaptor());
    }

    @Test
    public void testProcessingTimeTimerCreationAndDeletion() throws Exception {
        this.testTimerCreationAndDeletion(new ProcessingTimeAdaptor());
    }

    private void testTimerCreationAndDeletion(TimeDomainAdaptor timeAdaptor) throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        timeAdaptor.setIsEventTime(mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 2L)));
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 17L);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)2L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 42L);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)3L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.shouldDeleteTimerOnElement(mockTrigger, 42L);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        timeAdaptor.shouldDeleteTimerOnElement(mockTrigger, 17L);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)1L, (long)timeAdaptor.numTimers(testHarness));
    }

    @Test
    public void testEventTimeTimerFiring() throws Exception {
        this.testTimerFiring(new EventTimeAdaptor());
    }

    @Test
    public void testProcessingTimeTimerFiring() throws Exception {
        this.testTimerFiring(new ProcessingTimeAdaptor());
    }

    private void testTimerFiring(TimeDomainAdaptor timeAdaptor) throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        timeAdaptor.setIsEventTime(mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 100L)));
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 1L);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 17L);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 42L);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)4L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.advanceTime(testHarness, 1L);
        timeAdaptor.verifyTriggerCallback(mockTrigger, Mockito.atLeastOnce(), 1L, new TimeWindow(0L, 100L));
        timeAdaptor.verifyTriggerCallback(mockTrigger, Mockito.times((int)1), null, null);
        Assert.assertEquals((long)3L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.advanceTime(testHarness, 15L);
        timeAdaptor.verifyTriggerCallback(mockTrigger, Mockito.times((int)1), null, null);
        timeAdaptor.advanceTime(testHarness, 42L);
        timeAdaptor.verifyTriggerCallback(mockTrigger, Mockito.atLeastOnce(), 17L, new TimeWindow(0L, 100L));
        timeAdaptor.verifyTriggerCallback(mockTrigger, Mockito.atLeastOnce(), 42L, new TimeWindow(0L, 100L));
        timeAdaptor.verifyTriggerCallback(mockTrigger, Mockito.times((int)3), null, null);
        Assert.assertEquals((long)1L, (long)timeAdaptor.numTimers(testHarness));
    }

    @Test
    public void testEventTimeDeletedTimerDoesNotFire() throws Exception {
        this.testDeletedTimerDoesNotFire(new EventTimeAdaptor());
    }

    @Test
    public void testProcessingTimeDeletedTimerDoesNotFire() throws Exception {
        this.testDeletedTimerDoesNotFire(new ProcessingTimeAdaptor());
    }

    private void testDeletedTimerDoesNotFire(TimeDomainAdaptor timeAdaptor) throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        timeAdaptor.setIsEventTime(mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 100L)));
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 1L);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)2L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.shouldDeleteTimerOnElement(mockTrigger, 1L);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)1L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 2L);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)2L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.advanceTime(testHarness, 50L);
        timeAdaptor.verifyTriggerCallback(mockTrigger, Mockito.times((int)0), 1L, null);
        timeAdaptor.verifyTriggerCallback(mockTrigger, Mockito.times((int)1), 2L, new TimeWindow(0L, 100L));
        timeAdaptor.verifyTriggerCallback(mockTrigger, Mockito.times((int)1), null, null);
        Assert.assertEquals((long)1L, (long)timeAdaptor.numTimers(testHarness));
    }

    @Test
    public void testMergeWindowsIsCalled() throws Exception {
        MergingWindowAssigner mockAssigner = WindowOperatorContractTest.mockMergingAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, (Trigger)mockTrigger, 0L, (InternalWindowFunction)mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        ((MergingWindowAssigner)Mockito.verify(mockAssigner)).mergeWindows((Collection)Matchers.eq(Collections.singletonList(new TimeWindow(2L, 4L))), WindowOperatorContractTest.anyMergeCallback());
        ((MergingWindowAssigner)Mockito.verify(mockAssigner)).mergeWindows((Collection)Matchers.eq(Collections.singletonList(new TimeWindow(2L, 4L))), WindowOperatorContractTest.anyMergeCallback());
        ((MergingWindowAssigner)Mockito.verify(mockAssigner, (VerificationMode)Mockito.times((int)2))).mergeWindows(Matchers.anyCollection(), WindowOperatorContractTest.anyMergeCallback());
    }

    @Test
    public void testEventTimeWindowsAreMergedEagerly() throws Exception {
        this.testWindowsAreMergedEagerly(new EventTimeAdaptor());
    }

    @Test
    public void testProcessingTimeWindowsAreMergedEagerly() throws Exception {
        this.testWindowsAreMergedEagerly(new ProcessingTimeAdaptor());
    }

    private void testWindowsAreMergedEagerly(final TimeDomainAdaptor timeAdaptor) throws Exception {
        MergingWindowAssigner mockAssigner = WindowOperatorContractTest.mockMergingAssigner();
        timeAdaptor.setIsEventTime((WindowAssigner<?, ?>)mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, (Trigger)mockTrigger, 0L, (InternalWindowFunction)mockWindowFunction);
        testHarness.open();
        timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
        Assert.assertEquals((long)0L, (long)testHarness.extractOutputStreamRecords().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                timeAdaptor.registerTimer(context, 0L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).update((Object)"hello");
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.OnMergeContext context = (Trigger.OnMergeContext)invocation.getArguments()[1];
                timeAdaptor.registerTimer((Trigger.TriggerContext)context, 0L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).update((Object)"hello");
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger)).onMerge((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyOnMergeContext());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[1];
                timeAdaptor.deleteTimer(context, 0L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).clear();
                return null;
            }
        }).when(mockTrigger)).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 2L)));
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)3L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)timeAdaptor.numTimers(testHarness));
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L)));
        WindowOperatorContractTest.shouldMergeWindows(mockAssigner, new ArrayList<TimeWindow>(Arrays.asList(new TimeWindow(0L, 2L), new TimeWindow(2L, 4L))), new ArrayList<TimeWindow>(Arrays.asList(new TimeWindow(0L, 2L), new TimeWindow(2L, 4L))), new TimeWindow(0L, 4L));
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        ((Trigger)Mockito.verify(mockTrigger)).onMerge((Window)Matchers.eq((Object)new TimeWindow(0L, 4L)), WindowOperatorContractTest.anyOnMergeContext());
        Assert.assertEquals((long)3L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)timeAdaptor.numTimers(testHarness));
    }

    @Test
    public void testRejectShrinkingMergingEventTimeWindows() throws Exception {
        this.testRejectShrinkingMergingWindows(new EventTimeAdaptor());
    }

    @Test
    public void testRejectShrinkingMergingProcessingTimeWindows() throws Exception {
        this.testRejectShrinkingMergingWindows(new ProcessingTimeAdaptor());
    }

    void testRejectShrinkingMergingWindows(TimeDomainAdaptor timeAdaptor) throws Exception {
        int allowedLateness = 10;
        if (timeAdaptor instanceof ProcessingTimeAdaptor) {
            allowedLateness = 0;
        }
        MergingWindowAssigner mockAssigner = WindowOperatorContractTest.mockMergingAssigner();
        timeAdaptor.setIsEventTime((WindowAssigner<?, ?>)mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, (Trigger)mockTrigger, allowedLateness, (InternalWindowFunction)mockWindowFunction);
        testHarness.open();
        timeAdaptor.advanceTime(testHarness, 0L);
        Assert.assertEquals((long)0L, (long)testHarness.extractOutputStreamRecords().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 22L)));
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)1L, (long)timeAdaptor.numTimers(testHarness));
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 25L)));
        timeAdaptor.advanceTime(testHarness, 20L);
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)1L, (long)timeAdaptor.numTimers(testHarness));
        WindowOperatorContractTest.shouldMergeWindows(mockAssigner, new ArrayList<TimeWindow>(Arrays.asList(new TimeWindow(0L, 22L), new TimeWindow(0L, 25L))), new ArrayList<TimeWindow>(Arrays.asList(new TimeWindow(0L, 22L), new TimeWindow(0L, 25L))), new TimeWindow(0L, (long)(20 - allowedLateness + 2)));
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 25L)));
        WindowOperatorContractTest.shouldMergeWindows(mockAssigner, new ArrayList<TimeWindow>(Arrays.asList(new TimeWindow(0L, (long)(20 - allowedLateness + 2)), new TimeWindow(0L, 25L))), new ArrayList<TimeWindow>(Arrays.asList(new TimeWindow(0L, (long)(20 - allowedLateness + 2)), new TimeWindow(0L, 25L))), new TimeWindow(0L, (long)(20 - allowedLateness + 1)));
        this.expectedException.expect(UnsupportedOperationException.class);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
    }

    @Test
    public void testMergingOfExistingEventTimeWindows() throws Exception {
        this.testMergingOfExistingWindows(new EventTimeAdaptor());
    }

    @Test
    public void testMergingOfExistingProcessingTimeWindows() throws Exception {
        this.testMergingOfExistingWindows(new ProcessingTimeAdaptor());
    }

    private void testMergingOfExistingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
        MergingWindowAssigner mockAssigner = WindowOperatorContractTest.mockMergingAssigner();
        timeAdaptor.setIsEventTime((WindowAssigner<?, ?>)mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, (Trigger)mockTrigger, 0L, (InternalWindowFunction)mockWindowFunction);
        testHarness.open();
        timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
        Assert.assertEquals((long)0L, (long)testHarness.extractOutputStreamRecords().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                timeAdaptor.registerTimer(context, 0L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).update((Object)"hello");
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.OnMergeContext context = (Trigger.OnMergeContext)invocation.getArguments()[1];
                timeAdaptor.registerTimer((Trigger.TriggerContext)context, 0L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).update((Object)"hello");
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger)).onMerge((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyOnMergeContext());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[1];
                timeAdaptor.deleteTimer(context, 0L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).clear();
                return null;
            }
        }).when(mockTrigger)).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 2L)));
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)3L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)timeAdaptor.numTimers(testHarness));
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L)));
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)5L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)timeAdaptor.numTimers(testHarness));
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(1L, 3L)));
        WindowOperatorContractTest.shouldMergeWindows(mockAssigner, new ArrayList<TimeWindow>(Arrays.asList(new TimeWindow(0L, 2L), new TimeWindow(2L, 4L), new TimeWindow(1L, 3L))), new ArrayList<TimeWindow>(Arrays.asList(new TimeWindow(0L, 2L), new TimeWindow(2L, 4L), new TimeWindow(1L, 3L))), new TimeWindow(0L, 4L));
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)3L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)timeAdaptor.numTimers(testHarness));
        Assert.assertEquals((long)0L, (long)testHarness.extractOutputStreamRecords().size());
    }

    @Test
    public void testOnElementPurgeDoesNotCleanupMergingSet() throws Exception {
        MergingWindowAssigner mockAssigner = WindowOperatorContractTest.mockMergingAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, (Trigger)mockTrigger, 0L, (InternalWindowFunction)mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 2L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                return TriggerResult.PURGE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)1L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)1L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
    }

    @Test
    public void testOnEventTimePurgeDoesNotCleanupMergingSet() throws Exception {
        this.testOnTimePurgeDoesNotCleanupMergingSet(new EventTimeAdaptor());
    }

    @Test
    public void testOnProcessingTimePurgeDoesNotCleanupMergingSet() throws Exception {
        this.testOnTimePurgeDoesNotCleanupMergingSet(new ProcessingTimeAdaptor());
    }

    private void testOnTimePurgeDoesNotCleanupMergingSet(TimeDomainAdaptor timeAdaptor) throws Exception {
        MergingWindowAssigner mockAssigner = WindowOperatorContractTest.mockMergingAssigner();
        timeAdaptor.setIsEventTime((WindowAssigner<?, ?>)mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, (Trigger)mockTrigger, 0L, (InternalWindowFunction)mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 4L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 1L);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        timeAdaptor.shouldPurgeOnTime(mockTrigger);
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)timeAdaptor.numTimers(testHarness));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        timeAdaptor.advanceTime(testHarness, 1L);
        Assert.assertEquals((long)1L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)1L, (long)timeAdaptor.numTimers(testHarness));
        Assert.assertEquals((long)0L, (long)testHarness.extractOutputStreamRecords().size());
    }

    @Test
    public void testNoEventTimeGarbageCollectionTimerForGlobalWindow() throws Exception {
        this.testNoGarbageCollectionTimerForGlobalWindow(new EventTimeAdaptor());
    }

    @Test
    public void testNoProcessingTimeGarbageCollectionTimerForGlobalWindow() throws Exception {
        this.testNoGarbageCollectionTimerForGlobalWindow(new ProcessingTimeAdaptor());
    }

    private void testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockGlobalWindowAssigner();
        timeAdaptor.setIsEventTime(mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        Assert.assertEquals((long)Long.MAX_VALUE, (long)GlobalWindow.get().maxTimestamp());
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)1L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)0L, (long)testHarness.numProcessingTimeTimers());
    }

    @Test
    public void testNoEventTimeGarbageCollectionTimerForLongMax() throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 0x7FFFFFFFFFFFFFF5L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)1L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)0L, (long)testHarness.numProcessingTimeTimers());
    }

    @Test
    public void testProcessingTimeGarbageCollectionTimerIsAlwaysWindowMaxTimestamp() throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Mockito.when((Object)mockAssigner.isEventTime()).thenReturn((Object)false);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 0x7FFFFFFFFFFFFFF5L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)1L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)1L, (long)testHarness.numProcessingTimeTimers());
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.never())).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        testHarness.setProcessingTime(0x7FFFFFFFFFFFFFF5L);
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.times((int)1))).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)0L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)0L, (long)testHarness.numProcessingTimeTimers());
    }

    @Test
    public void testEventTimeGarbageCollectionTimer() throws Exception {
        this.testGarbageCollectionTimer(new EventTimeAdaptor());
    }

    @Test
    public void testProcessingTimeGarbageCollectionTimer() throws Exception {
        this.testGarbageCollectionTimer(new ProcessingTimeAdaptor());
    }

    private void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) throws Exception {
        long allowedLateness = 20L;
        if (timeAdaptor instanceof ProcessingTimeAdaptor) {
            allowedLateness = 0L;
        }
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        timeAdaptor.setIsEventTime(mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, allowedLateness, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 20L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)1L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)1L, (long)timeAdaptor.numTimers(testHarness));
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimersOtherDomain(testHarness));
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.never())).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        timeAdaptor.shouldFireOnTime(mockTrigger);
        timeAdaptor.advanceTime(testHarness, 19L + allowedLateness);
        timeAdaptor.verifyTriggerCallback(mockTrigger, Mockito.times((int)1), 19L + allowedLateness, null);
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(0L, 20L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0), WindowOperatorContractTest.anyCollector());
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.times((int)1))).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimers(testHarness));
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimersOtherDomain(testHarness));
    }

    @Test
    public void testEventTimeTriggerTimerAndGarbageCollectionTimerCoincide() throws Exception {
        this.testTriggerTimerAndGarbageCollectionTimerCoincide(new EventTimeAdaptor());
    }

    @Test
    public void testProcessingTimeTriggerTimerAndGarbageCollectionTimerCoincide() throws Exception {
        this.testTriggerTimerAndGarbageCollectionTimerCoincide(new ProcessingTimeAdaptor());
    }

    private void testTriggerTimerAndGarbageCollectionTimerCoincide(final TimeDomainAdaptor timeAdaptor) throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        timeAdaptor.setIsEventTime(mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 20L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                timeAdaptor.registerTimer(context, 19L);
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)1L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)1L, (long)timeAdaptor.numTimers(testHarness));
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimersOtherDomain(testHarness));
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.never())).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        timeAdaptor.advanceTime(testHarness, 19L);
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.times((int)1))).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        timeAdaptor.verifyTriggerCallback(mockTrigger, Mockito.times((int)1), null, null);
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimers(testHarness));
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimersOtherDomain(testHarness));
    }

    @Test
    public void testStateAndTimerCleanupAtEventTimeGarbageCollection() throws Exception {
        this.testStateAndTimerCleanupAtEventTimeGarbageCollection(new EventTimeAdaptor());
    }

    @Test
    public void testStateAndTimerCleanupAtProcessingTimeGarbageCollection() throws Exception {
        this.testStateAndTimerCleanupAtEventTimeGarbageCollection(new ProcessingTimeAdaptor());
    }

    private void testStateAndTimerCleanupAtEventTimeGarbageCollection(final TimeDomainAdaptor timeAdaptor) throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        timeAdaptor.setIsEventTime(mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 20L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                timeAdaptor.registerTimer(context, 1000L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).update((Object)"hello");
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[1];
                timeAdaptor.deleteTimer(context, 1000L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).clear();
                return null;
            }
        }).when(mockTrigger)).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.never())).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)timeAdaptor.numTimers(testHarness));
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimersOtherDomain(testHarness));
        timeAdaptor.advanceTime(testHarness, 39L);
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.times((int)1))).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimers(testHarness));
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimersOtherDomain(testHarness));
    }

    @Test
    public void testStateAndTimerCleanupAtEventTimeGarbageCollectionWithPurgingTrigger() throws Exception {
        this.testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(new EventTimeAdaptor());
    }

    @Test
    public void testStateAndTimerCleanupAtProcessingTimeGarbageCollectionWithPurgingTrigger() throws Exception {
        this.testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(new ProcessingTimeAdaptor());
    }

    private void testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(final TimeDomainAdaptor timeAdaptor) throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        timeAdaptor.setIsEventTime(mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 20L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                timeAdaptor.registerTimer(context, 1000L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).update((Object)"hello");
                return TriggerResult.PURGE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[1];
                timeAdaptor.deleteTimer(context, 1000L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).clear();
                return null;
            }
        }).when(mockTrigger)).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.never())).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)1L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)timeAdaptor.numTimers(testHarness));
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimersOtherDomain(testHarness));
        timeAdaptor.advanceTime(testHarness, 39L);
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.times((int)1))).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimers(testHarness));
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimersOtherDomain(testHarness));
    }

    @Test
    public void testStateAndTimerCleanupAtEventTimeGarbageCollectionWithPurgingTriggerAndMergingWindows() throws Exception {
        this.testStateAndTimerCleanupAtGarbageCollectionWithPurgingTriggerAndMergingWindows(new EventTimeAdaptor());
    }

    @Test
    public void testStateAndTimerCleanupAtProcessingTimeGarbageCollectionWithPurgingTriggerAndMergingWindows() throws Exception {
        this.testStateAndTimerCleanupAtGarbageCollectionWithPurgingTriggerAndMergingWindows(new ProcessingTimeAdaptor());
    }

    private void testStateAndTimerCleanupAtGarbageCollectionWithPurgingTriggerAndMergingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
        MergingWindowAssigner mockAssigner = WindowOperatorContractTest.mockMergingAssigner();
        timeAdaptor.setIsEventTime((WindowAssigner<?, ?>)mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, (Trigger)mockTrigger, 20L, (InternalWindowFunction)mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 20L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                timeAdaptor.registerTimer(context, 1000L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).update((Object)"hello");
                return TriggerResult.PURGE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[1];
                timeAdaptor.deleteTimer(context, 1000L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).clear();
                return null;
            }
        }).when(mockTrigger)).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.never())).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)2L, (long)timeAdaptor.numTimers(testHarness));
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimersOtherDomain(testHarness));
        timeAdaptor.advanceTime(testHarness, 39L);
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.times((int)1))).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimers(testHarness));
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimersOtherDomain(testHarness));
    }

    @Test
    public void testMergingWindowSetClearedAtEventTimeGarbageCollection() throws Exception {
        this.testMergingWindowSetClearedAtGarbageCollection(new EventTimeAdaptor());
    }

    @Test
    public void testMergingWindowSetClearedAtProcessingTimeGarbageCollection() throws Exception {
        this.testMergingWindowSetClearedAtGarbageCollection(new ProcessingTimeAdaptor());
    }

    private void testMergingWindowSetClearedAtGarbageCollection(TimeDomainAdaptor timeAdaptor) throws Exception {
        MergingWindowAssigner mockAssigner = WindowOperatorContractTest.mockMergingAssigner();
        timeAdaptor.setIsEventTime((WindowAssigner<?, ?>)mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, (Trigger)mockTrigger, 20L, (InternalWindowFunction)mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 20L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)1L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.advanceTime(testHarness, 39L);
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimers(testHarness));
    }

    @Test
    public void testProcessingElementsWithinAllowedLateness() throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 2L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        WindowOperatorContractTest.shouldFireOnElement(mockTrigger);
        testHarness.processWatermark(new Watermark(20L));
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0), WindowOperatorContractTest.anyCollector());
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.never())).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)1L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)1L, (long)testHarness.numEventTimeTimers());
    }

    @Test
    public void testLateWindowDropping() throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 2L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        WindowOperatorContractTest.shouldFireOnElement(mockTrigger);
        testHarness.processWatermark(new Watermark(21L));
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numEventTimeTimers());
        Assert.assertEquals((long)0L, (long)testHarness.numProcessingTimeTimers());
        Assert.assertEquals((long)0L, (long)testHarness.extractOutputStreamRecords().size());
    }

    @Test
    public void testStateSnapshotAndRestore() throws Exception {
        MergingWindowAssigner mockAssigner = WindowOperatorContractTest.mockMergingAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, (Trigger)mockTrigger, 0L, (InternalWindowFunction)mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                context.registerEventTimeTimer(0L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).update((Object)"hello");
                return TriggerResult.CONTINUE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        WindowOperatorContractTest.shouldFireAndPurgeOnEventTime(mockTrigger);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)5L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)testHarness.numEventTimeTimers());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        mockAssigner = WindowOperatorContractTest.mockMergingAssigner();
        mockTrigger = WindowOperatorContractTest.mockTrigger();
        ((Trigger)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Exception {
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[1];
                context.deleteEventTimeTimer(0L);
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).clear();
                return null;
            }
        }).when(mockTrigger)).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        Mockito.when((Object)mockTrigger.onEventTime(Matchers.eq((long)0L), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.FIRE);
        mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, (Trigger)mockTrigger, 0L, (InternalWindowFunction)mockWindowFunction);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        Assert.assertEquals((long)0L, (long)testHarness.extractOutputStreamRecords().size());
        Assert.assertEquals((long)5L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)testHarness.numEventTimeTimers());
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.never())).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        testHarness.processWatermark(new Watermark(20L));
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.times((int)2))).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)2))).process((Object)Matchers.eq((int)0), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.anyIntIterable(), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)0), (Window)Matchers.eq((Object)new TimeWindow(2L, 4L)), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.intIterable(0), WindowOperatorContractTest.anyCollector());
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.times((int)4))).onEventTime(Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.times((int)1))).onEventTime(Matchers.eq((long)0L), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), WindowOperatorContractTest.anyTriggerContext());
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.times((int)1))).onEventTime(Matchers.eq((long)0L), (Window)Matchers.eq((Object)new TimeWindow(2L, 4L)), WindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)testHarness.numEventTimeTimers());
    }

    @Test
    public void testPerWindowStateSetAndClearedOnEventTimePurge() throws Exception {
        this.testPerWindowStateSetAndClearedOnPurge(new EventTimeAdaptor());
    }

    @Test
    public void testPerWindowStateSetAndClearedOnProcessingTimePurge() throws Exception {
        this.testPerWindowStateSetAndClearedOnPurge(new ProcessingTimeAdaptor());
    }

    public void testPerWindowStateSetAndClearedOnPurge(TimeDomainAdaptor timeAdaptor) throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        timeAdaptor.setIsEventTime(mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockTrigger.onElement((Object)Matchers.anyInt(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.FIRE);
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 20L)));
        ((InternalWindowFunction)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
                context.windowState().getState(valueStateDescriptor).update((Object)"hello");
                return null;
            }
        }).when(mockWindowFunction)).process((Object)Matchers.anyInt(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.anyIntIterable(), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1];
                context.windowState().getState(valueStateDescriptor).clear();
                return null;
            }
        }).when(mockWindowFunction)).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext());
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertEquals((long)2L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)1L, (long)timeAdaptor.numTimers(testHarness));
        timeAdaptor.advanceTime(testHarness, 39L);
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)0L, (long)timeAdaptor.numTimers(testHarness));
    }

    @Test
    public void testWindowStateNotAvailableToMergingWindows() throws Exception {
        MergingWindowAssigner mockAssigner = WindowOperatorContractTest.mockMergingAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, (Trigger)mockTrigger, 20L, (InternalWindowFunction)mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockTrigger.onElement((Object)Matchers.anyInt(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.FIRE);
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 20L)));
        ((InternalWindowFunction)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
                context.windowState().getState(valueStateDescriptor).update((Object)"hello");
                return null;
            }
        }).when(mockWindowFunction)).process((Object)Matchers.anyInt(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.anyIntIterable(), WindowOperatorContractTest.anyCollector());
        this.expectedException.expect(UnsupportedOperationException.class);
        this.expectedException.expectMessage("Per-window state is not allowed when using merging windows.");
        testHarness.processElement(new StreamRecord((Object)0, 0L));
    }

    @Test
    public void testEventTimeQuerying() throws Exception {
        this.testCurrentTimeQuerying(new EventTimeAdaptor());
    }

    @Test
    public void testProcessingTimeQuerying() throws Exception {
        this.testCurrentTimeQuerying(new ProcessingTimeAdaptor());
    }

    @Test
    public void testStateTypeIsConsistentFromWindowStateAndGlobalState() throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockTrigger.onElement((Object)Matchers.anyInt(), Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext())).thenReturn((Object)TriggerResult.FIRE);
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 20L)));
        final AtomicBoolean processWasInvoked = new AtomicBoolean(false);
        ((InternalWindowFunction)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
                KeyedStateStore windowKeyedStateStore = context.windowState();
                KeyedStateStore globalKeyedStateStore = context.globalState();
                ListStateDescriptor windowListStateDescriptor = new ListStateDescriptor("windowListState", String.class);
                ListStateDescriptor globalListStateDescriptor = new ListStateDescriptor("globalListState", String.class);
                Assert.assertEquals(windowKeyedStateStore.getListState(windowListStateDescriptor).getClass(), globalKeyedStateStore.getListState(globalListStateDescriptor).getClass());
                ValueStateDescriptor windowValueStateDescriptor = new ValueStateDescriptor("windowValueState", String.class);
                ValueStateDescriptor globalValueStateDescriptor = new ValueStateDescriptor("globalValueState", String.class);
                Assert.assertEquals(windowKeyedStateStore.getState(windowValueStateDescriptor).getClass(), globalKeyedStateStore.getState(globalValueStateDescriptor).getClass());
                class NoOpAggregateFunction
                implements AggregateFunction<String, String, String> {
                    NoOpAggregateFunction() {
                    }

                    public String createAccumulator() {
                        return null;
                    }

                    public String add(String value, String accumulator) {
                        return null;
                    }

                    public String getResult(String accumulator) {
                        return null;
                    }

                    public String merge(String a, String b) {
                        return null;
                    }
                }
                AggregatingStateDescriptor windowAggStateDesc = new AggregatingStateDescriptor("windowAgg", (AggregateFunction)new NoOpAggregateFunction(), String.class);
                AggregatingStateDescriptor globalAggStateDesc = new AggregatingStateDescriptor("globalAgg", (AggregateFunction)new NoOpAggregateFunction(), String.class);
                Assert.assertEquals(windowKeyedStateStore.getAggregatingState(windowAggStateDesc).getClass(), globalKeyedStateStore.getAggregatingState(globalAggStateDesc).getClass());
                ReducingStateDescriptor windowReducingStateDesc = new ReducingStateDescriptor("windowReducing", (ReduceFunction & Serializable)(a, b) -> a, String.class);
                ReducingStateDescriptor globalReducingStateDesc = new ReducingStateDescriptor("globalReducing", (ReduceFunction & Serializable)(a, b) -> a, String.class);
                Assert.assertEquals(windowKeyedStateStore.getReducingState(windowReducingStateDesc).getClass(), globalKeyedStateStore.getReducingState(globalReducingStateDesc).getClass());
                FoldingStateDescriptor windowFoldingStateDescriptor = new FoldingStateDescriptor("windowFolding", (Object)"", (FoldFunction & Serializable)(a, b) -> a, String.class);
                FoldingStateDescriptor globalFoldingStateDescriptor = new FoldingStateDescriptor("globalFolding", (Object)"", (FoldFunction & Serializable)(a, b) -> a, String.class);
                Assert.assertEquals(windowKeyedStateStore.getFoldingState(windowFoldingStateDescriptor).getClass(), globalKeyedStateStore.getFoldingState(globalFoldingStateDescriptor).getClass());
                MapStateDescriptor windowMapStateDescriptor = new MapStateDescriptor("windowMapState", String.class, String.class);
                MapStateDescriptor globalMapStateDescriptor = new MapStateDescriptor("globalMapState", String.class, String.class);
                Assert.assertEquals(windowKeyedStateStore.getMapState(windowMapStateDescriptor).getClass(), globalKeyedStateStore.getMapState(globalMapStateDescriptor).getClass());
                processWasInvoked.set(true);
                return null;
            }
        }).when(mockWindowFunction)).process((Object)Matchers.anyInt(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.anyIntIterable(), WindowOperatorContractTest.anyCollector());
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        Assert.assertTrue((boolean)processWasInvoked.get());
    }

    public void testCurrentTimeQuerying(final TimeDomainAdaptor timeAdaptor) throws Exception {
        WindowAssigner mockAssigner = WindowOperatorContractTest.mockTimeWindowAssigner();
        timeAdaptor.setIsEventTime(mockAssigner);
        Trigger mockTrigger = WindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = WindowOperatorContractTest.mockWindowFunction();
        final KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
        testHarness.open();
        WindowOperatorContractTest.shouldFireOnElement(mockTrigger);
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), WindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(0L, 20L)));
        ((InternalWindowFunction)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
                timeAdaptor.verifyCorrectTime(testHarness, context);
                return null;
            }
        }).when(mockWindowFunction)).process((Object)Matchers.anyInt(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.anyIntIterable(), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1];
                timeAdaptor.verifyCorrectTime(testHarness, context);
                return null;
            }
        }).when(mockWindowFunction)).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext());
        timeAdaptor.advanceTime(testHarness, 10L);
        testHarness.processElement(new StreamRecord((Object)0, 0L));
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.anyInt(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext(), WindowOperatorContractTest.anyIntIterable(), WindowOperatorContractTest.anyCollector());
        timeAdaptor.advanceTime(testHarness, 100L);
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).clear((Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyInternalWindowContext());
    }

    protected abstract <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(WindowAssigner<Integer, W> var1, Trigger<Integer, W> var2, long var3, InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> var5, OutputTag<Integer> var6) throws Exception;

    protected abstract <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(WindowAssigner<Integer, W> var1, Trigger<Integer, W> var2, long var3, InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> var5) throws Exception;

    private static class ProcessingTimeAdaptor
    implements TimeDomainAdaptor {
        private ProcessingTimeAdaptor() {
        }

        @Override
        public void setIsEventTime(WindowAssigner<?, ?> mockAssigner) {
            Mockito.when((Object)mockAssigner.isEventTime()).thenReturn((Object)false);
        }

        @Override
        public void advanceTime(OneInputStreamOperatorTestHarness testHarness, long timestamp) throws Exception {
            testHarness.setProcessingTime(timestamp);
        }

        @Override
        public void registerTimer(Trigger.TriggerContext ctx, long timestamp) {
            ctx.registerProcessingTimeTimer(timestamp);
        }

        @Override
        public void deleteTimer(Trigger.TriggerContext ctx, long timestamp) {
            ctx.deleteProcessingTimeTimer(timestamp);
        }

        @Override
        public int numTimers(AbstractStreamOperatorTestHarness testHarness) {
            return testHarness.numProcessingTimeTimers();
        }

        @Override
        public int numTimersOtherDomain(AbstractStreamOperatorTestHarness testHarness) {
            return testHarness.numEventTimeTimers();
        }

        @Override
        public void shouldRegisterTimerOnElement(Trigger<?, TimeWindow> mockTrigger, long timestamp) throws Exception {
            WindowOperatorContractTest.shouldRegisterProcessingTimeTimerOnElement(mockTrigger, timestamp);
        }

        @Override
        public void shouldDeleteTimerOnElement(Trigger<?, TimeWindow> mockTrigger, long timestamp) throws Exception {
            WindowOperatorContractTest.shouldDeleteProcessingTimeTimerOnElement(mockTrigger, timestamp);
        }

        @Override
        public void shouldContinueOnTime(Trigger<?, TimeWindow> mockTrigger) throws Exception {
            WindowOperatorContractTest.shouldContinueOnProcessingTime(mockTrigger);
        }

        @Override
        public void shouldFireOnTime(Trigger<?, TimeWindow> mockTrigger) throws Exception {
            WindowOperatorContractTest.shouldFireOnProcessingTime(mockTrigger);
        }

        @Override
        public void shouldFireAndPurgeOnTime(Trigger<?, TimeWindow> mockTrigger) throws Exception {
            WindowOperatorContractTest.shouldFireAndPurgeOnProcessingTime(mockTrigger);
        }

        @Override
        public void shouldPurgeOnTime(Trigger<?, TimeWindow> mockTrigger) throws Exception {
            WindowOperatorContractTest.shouldPurgeOnProcessingTime(mockTrigger);
        }

        @Override
        public void verifyTriggerCallback(Trigger<?, TimeWindow> mockTrigger, VerificationMode verificationMode, Long time, TimeWindow window) throws Exception {
            if (time == null && window == null) {
                ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)verificationMode)).onProcessingTime(Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
            } else if (time == null) {
                ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)verificationMode)).onProcessingTime(Matchers.anyLong(), (Window)Matchers.eq((Object)window), WindowOperatorContractTest.anyTriggerContext());
            } else if (window == null) {
                ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)verificationMode)).onProcessingTime(((Long)Matchers.eq((Object)time)).longValue(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
            } else {
                ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)verificationMode)).onProcessingTime(((Long)Matchers.eq((Object)time)).longValue(), (Window)Matchers.eq((Object)window), WindowOperatorContractTest.anyTriggerContext());
            }
        }

        @Override
        public void verifyCorrectTime(OneInputStreamOperatorTestHarness testHarness, InternalWindowFunction.InternalWindowContext context) {
            Assert.assertEquals((long)testHarness.getProcessingTime(), (long)context.currentProcessingTime());
        }
    }

    private static class EventTimeAdaptor
    implements TimeDomainAdaptor {
        private EventTimeAdaptor() {
        }

        @Override
        public void setIsEventTime(WindowAssigner<?, ?> mockAssigner) {
            Mockito.when((Object)mockAssigner.isEventTime()).thenReturn((Object)true);
        }

        @Override
        public void advanceTime(OneInputStreamOperatorTestHarness testHarness, long timestamp) throws Exception {
            testHarness.processWatermark(new Watermark(timestamp));
        }

        @Override
        public void registerTimer(Trigger.TriggerContext ctx, long timestamp) {
            ctx.registerEventTimeTimer(timestamp);
        }

        @Override
        public void deleteTimer(Trigger.TriggerContext ctx, long timestamp) {
            ctx.deleteEventTimeTimer(timestamp);
        }

        @Override
        public int numTimers(AbstractStreamOperatorTestHarness testHarness) {
            return testHarness.numEventTimeTimers();
        }

        @Override
        public int numTimersOtherDomain(AbstractStreamOperatorTestHarness testHarness) {
            return testHarness.numProcessingTimeTimers();
        }

        @Override
        public void shouldRegisterTimerOnElement(Trigger<?, TimeWindow> mockTrigger, long timestamp) throws Exception {
            WindowOperatorContractTest.shouldRegisterEventTimeTimerOnElement(mockTrigger, timestamp);
        }

        @Override
        public void shouldDeleteTimerOnElement(Trigger<?, TimeWindow> mockTrigger, long timestamp) throws Exception {
            WindowOperatorContractTest.shouldDeleteEventTimeTimerOnElement(mockTrigger, timestamp);
        }

        @Override
        public void shouldContinueOnTime(Trigger<?, TimeWindow> mockTrigger) throws Exception {
            WindowOperatorContractTest.shouldContinueOnEventTime(mockTrigger);
        }

        @Override
        public void shouldFireOnTime(Trigger<?, TimeWindow> mockTrigger) throws Exception {
            WindowOperatorContractTest.shouldFireOnEventTime(mockTrigger);
        }

        @Override
        public void shouldFireAndPurgeOnTime(Trigger<?, TimeWindow> mockTrigger) throws Exception {
            WindowOperatorContractTest.shouldFireAndPurgeOnEventTime(mockTrigger);
        }

        @Override
        public void shouldPurgeOnTime(Trigger<?, TimeWindow> mockTrigger) throws Exception {
            WindowOperatorContractTest.shouldPurgeOnEventTime(mockTrigger);
        }

        @Override
        public void verifyTriggerCallback(Trigger<?, TimeWindow> mockTrigger, VerificationMode verificationMode, Long time, TimeWindow window) throws Exception {
            if (time == null && window == null) {
                ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)verificationMode)).onEventTime(Matchers.anyLong(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
            } else if (time == null) {
                ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)verificationMode)).onEventTime(Matchers.anyLong(), (Window)Matchers.eq((Object)window), WindowOperatorContractTest.anyTriggerContext());
            } else if (window == null) {
                ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)verificationMode)).onEventTime(((Long)Matchers.eq((Object)time)).longValue(), (Window)WindowOperatorContractTest.anyTimeWindow(), WindowOperatorContractTest.anyTriggerContext());
            } else {
                ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)verificationMode)).onEventTime(((Long)Matchers.eq((Object)time)).longValue(), (Window)Matchers.eq((Object)window), WindowOperatorContractTest.anyTriggerContext());
            }
        }

        @Override
        public void verifyCorrectTime(OneInputStreamOperatorTestHarness testHarness, InternalWindowFunction.InternalWindowContext context) {
            Assert.assertEquals((long)testHarness.getCurrentWatermark(), (long)context.currentWatermark());
        }
    }

    private static interface TimeDomainAdaptor {
        public void setIsEventTime(WindowAssigner<?, ?> var1);

        public void advanceTime(OneInputStreamOperatorTestHarness var1, long var2) throws Exception;

        public void registerTimer(Trigger.TriggerContext var1, long var2);

        public void deleteTimer(Trigger.TriggerContext var1, long var2);

        public int numTimers(AbstractStreamOperatorTestHarness var1);

        public int numTimersOtherDomain(AbstractStreamOperatorTestHarness var1);

        public void shouldRegisterTimerOnElement(Trigger<?, TimeWindow> var1, long var2) throws Exception;

        public void shouldDeleteTimerOnElement(Trigger<?, TimeWindow> var1, long var2) throws Exception;

        public void shouldContinueOnTime(Trigger<?, TimeWindow> var1) throws Exception;

        public void shouldFireOnTime(Trigger<?, TimeWindow> var1) throws Exception;

        public void shouldFireAndPurgeOnTime(Trigger<?, TimeWindow> var1) throws Exception;

        public void shouldPurgeOnTime(Trigger<?, TimeWindow> var1) throws Exception;

        public void verifyTriggerCallback(Trigger<?, TimeWindow> var1, VerificationMode var2, Long var3, TimeWindow var4) throws Exception;

        public void verifyCorrectTime(OneInputStreamOperatorTestHarness var1, InternalWindowFunction.InternalWindowContext var2);
    }
}

