/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.Arrays;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorContractTest;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.OutputTag;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class RegularWindowOperatorContractTest
extends WindowOperatorContractTest {
    @Test
    public void testReducingWindow() throws Exception {
        WindowAssigner mockAssigner = RegularWindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = RegularWindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = RegularWindowOperatorContractTest.mockWindowFunction();
        ReducingStateDescriptor intReduceSumDescriptor = new ReducingStateDescriptor("int-reduce", (ReduceFunction)new ReduceFunction<Integer>(){
            private static final long serialVersionUID = 1L;

            public Integer reduce(Integer a, Integer b) throws Exception {
                return a + b;
            }
        }, (TypeSerializer)IntSerializer.INSTANCE);
        final ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("string-state", (TypeSerializer)StringSerializer.INSTANCE);
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, (Trigger)mockTrigger, 0L, (StateDescriptor)intReduceSumDescriptor, (InternalWindowFunction)mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), RegularWindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement(new StreamRecord((Object)1, 0L));
        testHarness.processElement(new StreamRecord((Object)1, 0L));
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                TimeWindow window = (TimeWindow)invocation.getArguments()[2];
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                context.registerEventTimeTimer(window.getEnd());
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).update((Object)"hello");
                return TriggerResult.FIRE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)RegularWindowOperatorContractTest.anyTimeWindow(), RegularWindowOperatorContractTest.anyTriggerContext());
        testHarness.processElement(new StreamRecord((Object)1, 0L));
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)2))).process((Object)Matchers.eq((int)1), (Window)RegularWindowOperatorContractTest.anyTimeWindow(), RegularWindowOperatorContractTest.anyInternalWindowContext(), (Object)Matchers.anyInt(), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)1), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), RegularWindowOperatorContractTest.anyInternalWindowContext(), (Object)Matchers.eq((int)3), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)1), (Window)Matchers.eq((Object)new TimeWindow(2L, 4L)), RegularWindowOperatorContractTest.anyInternalWindowContext(), (Object)Matchers.eq((int)3), WindowOperatorContractTest.anyCollector());
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.never())).clear((Window)RegularWindowOperatorContractTest.anyTimeWindow(), RegularWindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)testHarness.numEventTimeTimers());
    }

    @Test
    public void testFoldingWindow() throws Exception {
        WindowAssigner mockAssigner = RegularWindowOperatorContractTest.mockTimeWindowAssigner();
        Trigger mockTrigger = RegularWindowOperatorContractTest.mockTrigger();
        InternalWindowFunction mockWindowFunction = RegularWindowOperatorContractTest.mockWindowFunction();
        FoldingStateDescriptor intFoldSumDescriptor = new FoldingStateDescriptor("int-fold", (Object)0, (FoldFunction)new FoldFunction<Integer, Integer>(){
            private static final long serialVersionUID = 1L;

            public Integer fold(Integer accumulator, Integer value) throws Exception {
                return accumulator + value;
            }
        }, (TypeSerializer)IntSerializer.INSTANCE);
        final ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("string-state", (TypeSerializer)StringSerializer.INSTANCE);
        KeyedOneInputStreamOperatorTestHarness testHarness = this.createWindowOperator((WindowAssigner)mockAssigner, (Trigger)mockTrigger, 0L, (StateDescriptor)intFoldSumDescriptor, (InternalWindowFunction)mockWindowFunction);
        testHarness.open();
        Mockito.when((Object)mockAssigner.assignWindows((Object)Matchers.anyInt(), Matchers.anyLong(), RegularWindowOperatorContractTest.anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        Assert.assertEquals((long)0L, (long)testHarness.getOutput().size());
        Assert.assertEquals((long)0L, (long)testHarness.numKeyedStateEntries());
        testHarness.processElement(new StreamRecord((Object)1, 0L));
        testHarness.processElement(new StreamRecord((Object)1, 0L));
        ((Trigger)Mockito.doAnswer((Answer)new Answer<TriggerResult>(){

            public TriggerResult answer(InvocationOnMock invocation) throws Exception {
                TimeWindow window = (TimeWindow)invocation.getArguments()[2];
                Trigger.TriggerContext context = (Trigger.TriggerContext)invocation.getArguments()[3];
                context.registerEventTimeTimer(window.getEnd());
                ((ValueState)context.getPartitionedState((StateDescriptor)valueStateDescriptor)).update((Object)"hello");
                return TriggerResult.FIRE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), (Window)RegularWindowOperatorContractTest.anyTimeWindow(), RegularWindowOperatorContractTest.anyTriggerContext());
        testHarness.processElement(new StreamRecord((Object)1, 0L));
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)2))).process((Object)Matchers.eq((int)1), (Window)RegularWindowOperatorContractTest.anyTimeWindow(), RegularWindowOperatorContractTest.anyInternalWindowContext(), (Object)Matchers.anyInt(), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)1), (Window)Matchers.eq((Object)new TimeWindow(0L, 2L)), RegularWindowOperatorContractTest.anyInternalWindowContext(), (Object)Matchers.eq((int)3), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction)Mockito.verify(mockWindowFunction, (VerificationMode)Mockito.times((int)1))).process((Object)Matchers.eq((int)1), (Window)Matchers.eq((Object)new TimeWindow(2L, 4L)), RegularWindowOperatorContractTest.anyInternalWindowContext(), (Object)Matchers.eq((int)3), WindowOperatorContractTest.anyCollector());
        ((Trigger)Mockito.verify(mockTrigger, (VerificationMode)Mockito.never())).clear((Window)RegularWindowOperatorContractTest.anyTimeWindow(), RegularWindowOperatorContractTest.anyTriggerContext());
        Assert.assertEquals((long)4L, (long)testHarness.numKeyedStateEntries());
        Assert.assertEquals((long)4L, (long)testHarness.numEventTimeTimers());
    }

    private <W extends Window, ACC, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(WindowAssigner<Integer, W> assigner, Trigger<Integer, W> trigger, long allowedLatenss, StateDescriptor<? extends AppendingState<Integer, ACC>, ?> stateDescriptor, InternalWindowFunction<ACC, OUT, Integer, W> windowFunction) throws Exception {
        KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>(){
            private static final long serialVersionUID = 1L;

            public Integer getKey(Integer value) throws Exception {
                return value;
            }
        };
        WindowOperator operator = new WindowOperator(assigner, assigner.getWindowSerializer(new ExecutionConfig()), (KeySelector)keySelector, (TypeSerializer)IntSerializer.INSTANCE, stateDescriptor, windowFunction, trigger, allowedLatenss, null);
        return new KeyedOneInputStreamOperatorTestHarness(operator, keySelector, (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO);
    }

    @Override
    protected <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(WindowAssigner<Integer, W> assigner, Trigger<Integer, W> trigger, long allowedLatenss, InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction, OutputTag<Integer> lateOutputTag) throws Exception {
        KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>(){
            private static final long serialVersionUID = 1L;

            public Integer getKey(Integer value) throws Exception {
                return value;
            }
        };
        ListStateDescriptor intListDescriptor = new ListStateDescriptor("int-list", (TypeSerializer)IntSerializer.INSTANCE);
        WindowOperator operator = new WindowOperator(assigner, assigner.getWindowSerializer(new ExecutionConfig()), (KeySelector)keySelector, (TypeSerializer)IntSerializer.INSTANCE, (StateDescriptor)intListDescriptor, windowFunction, trigger, allowedLatenss, lateOutputTag);
        return new KeyedOneInputStreamOperatorTestHarness(operator, keySelector, (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO);
    }

    @Override
    protected <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(WindowAssigner<Integer, W> assigner, Trigger<Integer, W> trigger, long allowedLatenss, InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction) throws Exception {
        return this.createWindowOperator(assigner, trigger, allowedLatenss, windowFunction, null);
    }
}

