/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.invokable.operator;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.streaming.api.windowing.policy.ActiveCloneableEvictionPolicyWrapper;
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
import org.apache.flink.streaming.util.MockContext;
import org.junit.Assert;
import org.junit.Test;

public class GroupedWindowInvokableTest {
    KeySelector<Tuple2<Integer, String>, ?> keySelector = new KeySelector<Tuple2<Integer, String>, String>(){
        private static final long serialVersionUID = 1L;

        public String getKey(Tuple2<Integer, String> value) throws Exception {
            return (String)value.f1;
        }
    };

    @Test
    public void testGroupedWindowInvokableFailTest() {
        ReduceFunction<Object> userFunction = new ReduceFunction<Object>(){
            private static final long serialVersionUID = 1L;

            public Object reduce(Object value1, Object value2) throws Exception {
                return null;
            }
        };
        KeySelector<Object, Object> keySelector = new KeySelector<Object, Object>(){
            private static final long serialVersionUID = 1L;

            public Object getKey(Object value) throws Exception {
                return null;
            }
        };
        LinkedList<CountEvictionPolicy> distributedEvictionPolicies = new LinkedList<CountEvictionPolicy>();
        LinkedList<CountTriggerPolicy> distributedTriggerPolicies = new LinkedList<CountTriggerPolicy>();
        LinkedList<CountEvictionPolicy> centralEvictionPolicies = new LinkedList<CountEvictionPolicy>();
        LinkedList<CountTriggerPolicy> centralTriggerPolicies = new LinkedList<CountTriggerPolicy>();
        try {
            new GroupedWindowInvokable((Function)userFunction, (KeySelector)keySelector, distributedTriggerPolicies, distributedEvictionPolicies, centralTriggerPolicies, centralEvictionPolicies);
            Assert.fail((String)"Creating instance without any trigger or eviction policy should cause an UnsupportedOperationException but didn't. (1)");
        }
        catch (UnsupportedOperationException e) {
            // empty catch block
        }
        try {
            new GroupedWindowInvokable((Function)userFunction, (KeySelector)keySelector, null, null, null, null);
            Assert.fail((String)"Creating instance without any trigger or eviction policy should cause an UnsupportedOperationException but didn't. (2)");
        }
        catch (UnsupportedOperationException e) {
            // empty catch block
        }
        centralTriggerPolicies.add(new CountTriggerPolicy(5));
        distributedTriggerPolicies.add(new CountTriggerPolicy(5));
        try {
            new GroupedWindowInvokable((Function)userFunction, (KeySelector)keySelector, distributedTriggerPolicies, distributedEvictionPolicies, centralTriggerPolicies, centralEvictionPolicies);
            Assert.fail((String)"Creating instance without any eviction policy should cause an UnsupportedOperationException but didn't. (3)");
        }
        catch (UnsupportedOperationException e) {
            // empty catch block
        }
        centralTriggerPolicies.clear();
        distributedTriggerPolicies.clear();
        centralEvictionPolicies.add(new CountEvictionPolicy(5));
        try {
            new GroupedWindowInvokable((Function)userFunction, (KeySelector)keySelector, distributedTriggerPolicies, distributedEvictionPolicies, centralTriggerPolicies, centralEvictionPolicies);
            Assert.fail((String)"Creating instance without any trigger policy should cause an UnsupportedOperationException but didn't. (4)");
        }
        catch (UnsupportedOperationException e) {
            // empty catch block
        }
        centralTriggerPolicies.add(new CountTriggerPolicy(5));
        distributedEvictionPolicies.add(new CountEvictionPolicy(5));
        try {
            new GroupedWindowInvokable((Function)userFunction, (KeySelector)keySelector, distributedTriggerPolicies, distributedEvictionPolicies, centralTriggerPolicies, centralEvictionPolicies);
            Assert.fail((String)"Creating instance with central and distributed eviction should cause an UnsupportedOperationException but didn't. (4)");
        }
        catch (UnsupportedOperationException e) {
            // empty catch block
        }
    }

    @Test
    public void testGroupedWindowInvokableDistributedTriggerSimple() {
        ArrayList<Integer> inputs = new ArrayList<Integer>();
        inputs.add(1);
        inputs.add(1);
        inputs.add(5);
        inputs.add(5);
        inputs.add(5);
        inputs.add(1);
        inputs.add(1);
        inputs.add(5);
        inputs.add(1);
        inputs.add(5);
        ArrayList<Integer> expectedDistributedEviction = new ArrayList<Integer>();
        expectedDistributedEviction.add(15);
        expectedDistributedEviction.add(3);
        expectedDistributedEviction.add(3);
        expectedDistributedEviction.add(15);
        ArrayList<Integer> expectedCentralEviction = new ArrayList<Integer>();
        expectedCentralEviction.add(2);
        expectedCentralEviction.add(5);
        expectedCentralEviction.add(15);
        expectedCentralEviction.add(2);
        expectedCentralEviction.add(5);
        expectedCentralEviction.add(2);
        expectedCentralEviction.add(5);
        expectedCentralEviction.add(1);
        expectedCentralEviction.add(5);
        LinkedList<CountTriggerPolicy> triggers = new LinkedList<CountTriggerPolicy>();
        triggers.add(new CountTriggerPolicy(2, -1));
        LinkedList<CountEvictionPolicy> evictions = new LinkedList<CountEvictionPolicy>();
        evictions.add(new CountEvictionPolicy(2, 2, -1));
        LinkedList<CountTriggerPolicy> centralTriggers = new LinkedList<CountTriggerPolicy>();
        ReduceFunction<Integer> reduceFunction = new ReduceFunction<Integer>(){
            private static final long serialVersionUID = 1L;

            public Integer reduce(Integer value1, Integer value2) throws Exception {
                return value1 + value2;
            }
        };
        KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>(){
            private static final long serialVersionUID = 1L;

            public Integer getKey(Integer value) {
                return value;
            }
        };
        GroupedWindowInvokable invokable = new GroupedWindowInvokable((Function)reduceFunction, (KeySelector)keySelector, triggers, evictions, centralTriggers, null);
        List<Integer> result = MockContext.createAndExecute(invokable, inputs);
        LinkedList<Integer> actual = new LinkedList<Integer>();
        for (Integer current : result) {
            actual.add(current);
        }
        Assert.assertEquals(new HashSet(expectedDistributedEviction), new HashSet(actual));
        Assert.assertEquals((long)expectedDistributedEviction.size(), (long)actual.size());
        triggers.clear();
        centralTriggers.add(new CountTriggerPolicy(2, -1));
        LinkedList<CountEvictionPolicy> centralEvictions = new LinkedList<CountEvictionPolicy>();
        centralEvictions.add(new CountEvictionPolicy(2, 2, -1));
        invokable = new GroupedWindowInvokable((Function)reduceFunction, (KeySelector)keySelector, triggers, null, centralTriggers, centralEvictions);
        result = MockContext.createAndExecute(invokable, inputs);
        actual = new LinkedList();
        for (Integer current : result) {
            actual.add(current);
        }
        Assert.assertEquals(new HashSet(expectedCentralEviction), new HashSet(actual));
        Assert.assertEquals((long)expectedCentralEviction.size(), (long)actual.size());
    }

    @Test
    public void testGroupedWindowInvokableDistributedTriggerComplex() {
        ArrayList<Tuple2> inputs2 = new ArrayList<Tuple2>();
        inputs2.add(new Tuple2((Object)1, (Object)"a"));
        inputs2.add(new Tuple2((Object)0, (Object)"b"));
        inputs2.add(new Tuple2((Object)2, (Object)"a"));
        inputs2.add(new Tuple2((Object)-1, (Object)"a"));
        inputs2.add(new Tuple2((Object)-2, (Object)"a"));
        inputs2.add(new Tuple2((Object)10, (Object)"a"));
        inputs2.add(new Tuple2((Object)2, (Object)"b"));
        inputs2.add(new Tuple2((Object)1, (Object)"a"));
        ArrayList<Tuple2> expected2 = new ArrayList<Tuple2>();
        expected2.add(new Tuple2((Object)-1, (Object)"a"));
        expected2.add(new Tuple2((Object)-2, (Object)"a"));
        expected2.add(new Tuple2((Object)0, (Object)"b"));
        LinkedList<CountTriggerPolicy> triggers = new LinkedList<CountTriggerPolicy>();
        triggers.add(new CountTriggerPolicy(3));
        LinkedList<TumblingEvictionPolicy> evictions = new LinkedList<TumblingEvictionPolicy>();
        evictions.add(new TumblingEvictionPolicy());
        LinkedList centralTriggers = new LinkedList();
        GroupedWindowInvokable invokable2 = new GroupedWindowInvokable((Function)new ReduceFunction<Tuple2<Integer, String>>(){
            private static final long serialVersionUID = 1L;

            public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
                if ((Integer)value1.f0 <= (Integer)value2.f0) {
                    return value1;
                }
                return value2;
            }
        }, this.keySelector, triggers, evictions, centralTriggers, null);
        List<Tuple2> result = MockContext.createAndExecute(invokable2, inputs2);
        LinkedList<Tuple2> actual2 = new LinkedList<Tuple2>();
        for (Tuple2 current : result) {
            actual2.add(current);
        }
        Assert.assertEquals(new HashSet(expected2), new HashSet(actual2));
        Assert.assertEquals((long)expected2.size(), (long)actual2.size());
    }

    @Test
    public void testGroupedWindowInvokableCentralActiveTrigger() {
        ArrayList<Tuple2> inputs = new ArrayList<Tuple2>();
        inputs.add(new Tuple2((Object)1, (Object)"a"));
        inputs.add(new Tuple2((Object)1, (Object)"b"));
        inputs.add(new Tuple2((Object)1, (Object)"c"));
        inputs.add(new Tuple2((Object)2, (Object)"a"));
        inputs.add(new Tuple2((Object)2, (Object)"b"));
        inputs.add(new Tuple2((Object)2, (Object)"c"));
        inputs.add(new Tuple2((Object)2, (Object)"b"));
        inputs.add(new Tuple2((Object)2, (Object)"a"));
        inputs.add(new Tuple2((Object)2, (Object)"c"));
        inputs.add(new Tuple2((Object)3, (Object)"c"));
        inputs.add(new Tuple2((Object)3, (Object)"a"));
        inputs.add(new Tuple2((Object)3, (Object)"b"));
        inputs.add(new Tuple2((Object)4, (Object)"a"));
        inputs.add(new Tuple2((Object)4, (Object)"b"));
        inputs.add(new Tuple2((Object)4, (Object)"c"));
        inputs.add(new Tuple2((Object)5, (Object)"c"));
        inputs.add(new Tuple2((Object)5, (Object)"a"));
        inputs.add(new Tuple2((Object)5, (Object)"b"));
        inputs.add(new Tuple2((Object)10, (Object)"b"));
        inputs.add(new Tuple2((Object)10, (Object)"a"));
        inputs.add(new Tuple2((Object)10, (Object)"c"));
        inputs.add(new Tuple2((Object)11, (Object)"a"));
        inputs.add(new Tuple2((Object)11, (Object)"a"));
        inputs.add(new Tuple2((Object)11, (Object)"c"));
        inputs.add(new Tuple2((Object)11, (Object)"c"));
        inputs.add(new Tuple2((Object)11, (Object)"b"));
        inputs.add(new Tuple2((Object)11, (Object)"b"));
        ArrayList<Tuple2> expected = new ArrayList<Tuple2>();
        expected.add(new Tuple2((Object)12, (Object)"a"));
        expected.add(new Tuple2((Object)12, (Object)"b"));
        expected.add(new Tuple2((Object)12, (Object)"c"));
        expected.add(new Tuple2((Object)12, (Object)"a"));
        expected.add(new Tuple2((Object)12, (Object)"b"));
        expected.add(new Tuple2((Object)12, (Object)"c"));
        expected.add(new Tuple2((Object)5, (Object)"a"));
        expected.add(new Tuple2((Object)5, (Object)"b"));
        expected.add(new Tuple2((Object)5, (Object)"c"));
        expected.add(new Tuple2((Object)10, (Object)"a"));
        expected.add(new Tuple2((Object)10, (Object)"b"));
        expected.add(new Tuple2((Object)10, (Object)"c"));
        expected.add(new Tuple2((Object)32, (Object)"a"));
        expected.add(new Tuple2((Object)32, (Object)"b"));
        expected.add(new Tuple2((Object)32, (Object)"c"));
        Timestamp<Tuple2<Integer, String>> myTimeStamp = new Timestamp<Tuple2<Integer, String>>(){
            private static final long serialVersionUID = 1L;

            public long getTimestamp(Tuple2<Integer, String> value) {
                return ((Integer)value.f0).intValue();
            }
        };
        TimestampWrapper myTimeStampWrapper = new TimestampWrapper((Timestamp)myTimeStamp, 1L);
        ReduceFunction<Tuple2<Integer, String>> myReduceFunction = new ReduceFunction<Tuple2<Integer, String>>(){
            private static final long serialVersionUID = 1L;

            public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
                return new Tuple2((Object)((Integer)value1.f0 + (Integer)value2.f0), value1.f1);
            }
        };
        LinkedList<TimeTriggerPolicy> triggers = new LinkedList<TimeTriggerPolicy>();
        triggers.add(new TimeTriggerPolicy(2L, myTimeStampWrapper, 2L));
        LinkedList<TimeEvictionPolicy> evictions = new LinkedList<TimeEvictionPolicy>();
        evictions.add(new TimeEvictionPolicy(4L, myTimeStampWrapper));
        LinkedList distributedTriggers = new LinkedList();
        GroupedWindowInvokable invokable = new GroupedWindowInvokable((Function)myReduceFunction, this.keySelector, distributedTriggers, evictions, triggers, null);
        ArrayList<Tuple2> result = new ArrayList<Tuple2>();
        for (Tuple2 t : MockContext.createAndExecute(invokable, inputs)) {
            result.add(t);
        }
        Assert.assertEquals(new HashSet(expected), new HashSet(result));
        Assert.assertEquals((long)expected.size(), (long)result.size());
        triggers.clear();
        triggers.add(new TimeTriggerPolicy(2L, myTimeStampWrapper, 2L));
        evictions.clear();
        LinkedList<TimeEvictionPolicy> centralEvictions = new LinkedList<TimeEvictionPolicy>();
        centralEvictions.add(new TimeEvictionPolicy(4L, myTimeStampWrapper));
        invokable = new GroupedWindowInvokable((Function)myReduceFunction, this.keySelector, distributedTriggers, evictions, triggers, centralEvictions);
        result = new ArrayList();
        for (Tuple2 t : MockContext.createAndExecute(invokable, inputs)) {
            result.add(t);
        }
        Assert.assertEquals(new HashSet(expected), new HashSet(result));
        Assert.assertEquals((long)expected.size(), (long)result.size());
    }

    @Test
    public void testGroupedWindowInvokableMultipleCentralTrigger() {
        LinkedList<CountTriggerPolicy> triggers = new LinkedList<CountTriggerPolicy>();
        triggers.add(new CountTriggerPolicy(8));
        triggers.add(new CountTriggerPolicy(5));
        LinkedList<ActiveCloneableEvictionPolicyWrapper> evictions = new LinkedList<ActiveCloneableEvictionPolicyWrapper>();
        evictions.add(new ActiveCloneableEvictionPolicyWrapper((CloneableEvictionPolicy)new TumblingEvictionPolicy()));
        LinkedList distributedTriggers = new LinkedList();
        ArrayList<Integer> inputs = new ArrayList<Integer>();
        inputs.add(1);
        inputs.add(2);
        inputs.add(2);
        inputs.add(2);
        inputs.add(1);
        inputs.add(2);
        inputs.add(1);
        inputs.add(2);
        inputs.add(2);
        inputs.add(1);
        ArrayList<Integer> expected = new ArrayList<Integer>();
        expected.add(2);
        expected.add(6);
        expected.add(4);
        expected.add(1);
        expected.add(2);
        expected.add(1);
        ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>(){
            private static final long serialVersionUID = 1L;

            public Integer reduce(Integer value1, Integer value2) throws Exception {
                return value1 + value2;
            }
        };
        GroupedWindowInvokable invokable = new GroupedWindowInvokable((Function)myReduceFunction, (KeySelector)new KeySelector<Integer, Integer>(){
            private static final long serialVersionUID = 1L;

            public Integer getKey(Integer value) {
                return value;
            }
        }, distributedTriggers, evictions, triggers, null);
        ArrayList<Integer> result = new ArrayList<Integer>();
        for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
            result.add(t);
        }
        Assert.assertEquals(new HashSet(expected), new HashSet(result));
        Assert.assertEquals((long)expected.size(), (long)result.size());
    }

    @Test
    public void testGroupedWindowInvokableCentralAndDistrTrigger() {
        LinkedList<CountTriggerPolicy> triggers = new LinkedList<CountTriggerPolicy>();
        triggers.add(new CountTriggerPolicy(8));
        triggers.add(new CountTriggerPolicy(5));
        LinkedList<ActiveCloneableEvictionPolicyWrapper> evictions = new LinkedList<ActiveCloneableEvictionPolicyWrapper>();
        evictions.add(new ActiveCloneableEvictionPolicyWrapper((CloneableEvictionPolicy)new TumblingEvictionPolicy()));
        LinkedList<CountTriggerPolicy> distributedTriggers = new LinkedList<CountTriggerPolicy>();
        distributedTriggers.add(new CountTriggerPolicy(2));
        ArrayList<Integer> inputs = new ArrayList<Integer>();
        inputs.add(1);
        inputs.add(2);
        inputs.add(2);
        inputs.add(2);
        inputs.add(1);
        inputs.add(2);
        inputs.add(1);
        inputs.add(2);
        inputs.add(2);
        inputs.add(1);
        ArrayList<Integer> expected = new ArrayList<Integer>();
        expected.add(4);
        expected.add(2);
        expected.add(2);
        expected.add(2);
        expected.add(1);
        expected.add(2);
        expected.add(1);
        expected.add(2);
        ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>(){
            private static final long serialVersionUID = 1L;

            public Integer reduce(Integer value1, Integer value2) throws Exception {
                return value1 + value2;
            }
        };
        GroupedWindowInvokable invokable = new GroupedWindowInvokable((Function)myReduceFunction, (KeySelector)new KeySelector<Integer, Integer>(){
            private static final long serialVersionUID = 1L;

            public Integer getKey(Integer value) {
                return value;
            }
        }, distributedTriggers, evictions, triggers, null);
        ArrayList<Integer> result = new ArrayList<Integer>();
        for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
            result.add(t);
        }
        Assert.assertEquals(new HashSet(expected), new HashSet(result));
        Assert.assertEquals((long)expected.size(), (long)result.size());
    }
}

