/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.invokable.operator;

import java.util.ArrayList;
import java.util.LinkedList;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.util.MockContext;
import org.junit.Assert;
import org.junit.Test;

public class WindowInvokableTest {
    @Test
    public void testWindowInvokableWithTimePolicy() {
        ArrayList<Integer> inputs = new ArrayList<Integer>();
        inputs.add(1);
        inputs.add(2);
        inputs.add(2);
        inputs.add(3);
        inputs.add(4);
        inputs.add(5);
        inputs.add(10);
        inputs.add(11);
        inputs.add(11);
        ArrayList<Integer> expected = new ArrayList<Integer>();
        expected.add(12);
        expected.add(12);
        expected.add(5);
        expected.add(10);
        expected.add(32);
        Timestamp<Integer> myTimeStamp = new Timestamp<Integer>(){
            private static final long serialVersionUID = 1L;

            public long getTimestamp(Integer value) {
                return value.intValue();
            }
        };
        ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>(){
            private static final long serialVersionUID = 1L;

            public Integer reduce(Integer value1, Integer value2) throws Exception {
                return value1 + value2;
            }
        };
        LinkedList<TimeTriggerPolicy> triggers = new LinkedList<TimeTriggerPolicy>();
        triggers.add(new TimeTriggerPolicy(2L, new TimestampWrapper((Timestamp)myTimeStamp, 1L), 2L));
        LinkedList<TimeEvictionPolicy> evictions = new LinkedList<TimeEvictionPolicy>();
        evictions.add(new TimeEvictionPolicy(4L, new TimestampWrapper((Timestamp)myTimeStamp, 1L)));
        WindowReduceInvokable invokable = new WindowReduceInvokable((ReduceFunction)myReduceFunction, triggers, evictions);
        ArrayList<Integer> result = new ArrayList<Integer>();
        for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
            result.add(t);
        }
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testWindowInvokableWithCountPolicy() {
        ArrayList<Integer> inputs = new ArrayList<Integer>();
        Integer i = 1;
        while (i <= 10) {
            inputs.add(i);
            Integer n = i;
            Integer n2 = i = Integer.valueOf(i + 1);
        }
        Object myReduceFunction = new ReduceFunction<Integer>(){
            private static final long serialVersionUID = 1L;

            public Integer reduce(Integer value1, Integer value2) throws Exception {
                return value1 + value2;
            }
        };
        LinkedList<CountTriggerPolicy> triggers = new LinkedList<CountTriggerPolicy>();
        triggers.add(new CountTriggerPolicy(2, -1));
        LinkedList<CountEvictionPolicy> evictions = new LinkedList<CountEvictionPolicy>();
        evictions.add(new CountEvictionPolicy(2, 2, -1));
        WindowReduceInvokable invokable = new WindowReduceInvokable((ReduceFunction)myReduceFunction, triggers, evictions);
        ArrayList<Integer> expected = new ArrayList<Integer>();
        expected.add(6);
        expected.add(12);
        expected.add(18);
        expected.add(24);
        expected.add(19);
        ArrayList<Integer> result = new ArrayList<Integer>();
        for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
            result.add(t);
        }
        Assert.assertEquals(expected, result);
        ArrayList<Integer> inputs2 = new ArrayList<Integer>();
        inputs2.add(1);
        inputs2.add(2);
        inputs2.add(-5);
        inputs2.add(-3);
        inputs2.add(-4);
        myReduceFunction = new ReduceFunction<Integer>(){
            private static final long serialVersionUID = 1L;

            public Integer reduce(Integer value1, Integer value2) throws Exception {
                if (value1 <= value2) {
                    return value1;
                }
                return value2;
            }
        };
        triggers = new LinkedList();
        triggers.add(new CountTriggerPolicy(3, 1));
        evictions = new LinkedList();
        evictions.add(new CountEvictionPolicy(3, 3, -1));
        WindowReduceInvokable invokable2 = new WindowReduceInvokable((ReduceFunction)myReduceFunction, triggers, evictions);
        ArrayList<Integer> expected2 = new ArrayList<Integer>();
        expected2.add(1);
        expected2.add(-4);
        result = new ArrayList();
        for (Integer t : MockContext.createAndExecute(invokable2, inputs2)) {
            result.add(t);
        }
        Assert.assertEquals(expected2, result);
    }

    @Test
    public void testWindowInvokableWithMultiplePolicies() {
        LinkedList<CountTriggerPolicy> triggers = new LinkedList<CountTriggerPolicy>();
        triggers.add(new CountTriggerPolicy(2));
        triggers.add(new CountTriggerPolicy(3));
        LinkedList<CountEvictionPolicy> evictions = new LinkedList<CountEvictionPolicy>();
        evictions.add(new CountEvictionPolicy(2, 2));
        evictions.add(new CountEvictionPolicy(3, 3));
        ArrayList<Integer> inputs = new ArrayList<Integer>();
        Integer i = 1;
        while (i <= 10) {
            inputs.add(i);
            Integer n = i;
            Integer n2 = i = Integer.valueOf(i + 1);
        }
        ArrayList<Integer> expected = new ArrayList<Integer>();
        expected.add(3);
        expected.add(3);
        expected.add(4);
        expected.add(11);
        expected.add(15);
        expected.add(9);
        expected.add(10);
        ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>(){
            private static final long serialVersionUID = 1L;

            public Integer reduce(Integer value1, Integer value2) throws Exception {
                return value1 + value2;
            }
        };
        WindowReduceInvokable invokable = new WindowReduceInvokable((ReduceFunction)myReduceFunction, triggers, evictions);
        ArrayList<Integer> result = new ArrayList<Integer>();
        for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
            result.add(t);
        }
        Assert.assertEquals(expected, result);
    }
}

