/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.CollectingOutput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class AggregatingAlignedProcessingTimeWindowOperatorTest {
    private final ReduceFunction<String> mockFunction = (ReduceFunction)Mockito.mock(ReduceFunction.class);
    private final KeySelector<String, String> mockKeySelector = (KeySelector)Mockito.mock(KeySelector.class);
    private final KeySelector<Tuple2<Integer, Integer>, Integer> fieldOneSelector = new KeySelector<Tuple2<Integer, Integer>, Integer>(){

        public Integer getKey(Tuple2<Integer, Integer> value) {
            return (Integer)value.f0;
        }
    };
    private final ReduceFunction<Tuple2<Integer, Integer>> sumFunction = new ReduceFunction<Tuple2<Integer, Integer>>(){

        public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) {
            return new Tuple2(value1.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1));
        }
    };
    private final TypeSerializer<Tuple2<Integer, Integer>> tupleSerializer = new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}).createSerializer(new ExecutionConfig());
    private final Comparator<Tuple2<Integer, Integer>> tupleComparator = new Comparator<Tuple2<Integer, Integer>>(){

        @Override
        public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) {
            int diff0 = (Integer)o1.f0 - (Integer)o2.f0;
            int diff1 = (Integer)o1.f1 - (Integer)o2.f1;
            return diff0 != 0 ? diff0 : diff1;
        }
    };

    public AggregatingAlignedProcessingTimeWindowOperatorTest() {
        ClosureCleaner.clean(this.fieldOneSelector, (boolean)false);
        ClosureCleaner.clean(this.sumFunction, (boolean)false);
    }

    @After
    public void checkNoTriggerThreadsRunning() {
        long deadline = System.currentTimeMillis() + 5000L;
        while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException interruptedException) {}
        }
        Assert.assertTrue((String)"Not all trigger threads where properly shut down", (StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0 ? 1 : 0) != 0);
    }

    @Test
    public void testInvalidParameters() {
        try {
            this.assertInvalidParameter(-1L, -1L);
            this.assertInvalidParameter(10000L, -1L);
            this.assertInvalidParameter(-1L, 1000L);
            this.assertInvalidParameter(1000L, 2000L);
            this.assertInvalidParameter(1000L, 999L);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testWindowSizeAndSlide() {
        try {
            AggregatingProcessingTimeWindowOperator op = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, 5000L, 1000L);
            Assert.assertEquals((long)5000L, (long)op.getWindowSize());
            Assert.assertEquals((long)1000L, (long)op.getWindowSlide());
            Assert.assertEquals((long)1000L, (long)op.getPaneSize());
            Assert.assertEquals((long)5L, (long)op.getNumPanesPerWindow());
            op = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, 1000L, 1000L);
            Assert.assertEquals((long)1000L, (long)op.getWindowSize());
            Assert.assertEquals((long)1000L, (long)op.getWindowSlide());
            Assert.assertEquals((long)1000L, (long)op.getPaneSize());
            Assert.assertEquals((long)1L, (long)op.getNumPanesPerWindow());
            op = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, 1500L, 1000L);
            Assert.assertEquals((long)1500L, (long)op.getWindowSize());
            Assert.assertEquals((long)1000L, (long)op.getWindowSlide());
            Assert.assertEquals((long)500L, (long)op.getPaneSize());
            Assert.assertEquals((long)3L, (long)op.getNumPanesPerWindow());
            op = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, 1200L, 1100L);
            Assert.assertEquals((long)1200L, (long)op.getWindowSize());
            Assert.assertEquals((long)1100L, (long)op.getWindowSlide());
            Assert.assertEquals((long)100L, (long)op.getPaneSize());
            Assert.assertEquals((long)12L, (long)op.getNumPanesPerWindow());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testWindowTriggerTimeAlignment() {
        try {
            Output mockOut = (Output)Mockito.mock(Output.class);
            StreamTask<?, ?> mockTask = AggregatingAlignedProcessingTimeWindowOperatorTest.createMockTask();
            AggregatingProcessingTimeWindowOperator op = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, 5000L, 1000L);
            op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
            op.open();
            Assert.assertTrue((op.getNextSlideTime() % 1000L == 0L ? 1 : 0) != 0);
            Assert.assertTrue((op.getNextEvaluationTime() % 1000L == 0L ? 1 : 0) != 0);
            op.dispose();
            op = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, 1000L, 1000L);
            op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
            op.open();
            Assert.assertTrue((op.getNextSlideTime() % 1000L == 0L ? 1 : 0) != 0);
            Assert.assertTrue((op.getNextEvaluationTime() % 1000L == 0L ? 1 : 0) != 0);
            op.dispose();
            op = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, 1500L, 1000L);
            op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
            op.open();
            Assert.assertTrue((op.getNextSlideTime() % 500L == 0L ? 1 : 0) != 0);
            Assert.assertTrue((op.getNextEvaluationTime() % 1000L == 0L ? 1 : 0) != 0);
            op.dispose();
            op = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, 1200L, 1100L);
            op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
            op.open();
            Assert.assertTrue((op.getNextSlideTime() % 100L == 0L ? 1 : 0) != 0);
            Assert.assertTrue((op.getNextEvaluationTime() % 1100L == 0L ? 1 : 0) != 0);
            op.dispose();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTumblingWindowUniqueElements() {
        ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
        try {
            Object object;
            int windowSize = 50;
            CollectingOutput out = new CollectingOutput(50);
            AggregatingProcessingTimeWindowOperator op = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, (TypeSerializer)IntSerializer.INSTANCE, this.tupleSerializer, 50L, 50L);
            Object lock = new Object();
            StreamTask<?, ?> mockTask = AggregatingAlignedProcessingTimeWindowOperatorTest.createMockTaskWithTimer(timerService, lock);
            op.setup(mockTask, new StreamConfig(new Configuration()), out);
            op.open();
            int numElements = 1000;
            for (int i = 0; i < 1000; ++i) {
                object = lock;
                synchronized (object) {
                    StreamRecord next = new StreamRecord((Object)new Tuple2((Object)i, (Object)i));
                    op.setKeyContextElement1(next);
                    op.processElement(next);
                }
                Thread.sleep(1L);
            }
            out.waitForNElements(1000, 60000L);
            List result = out.getElements();
            Assert.assertEquals((long)1000L, (long)result.size());
            object = lock;
            synchronized (object) {
                op.close();
            }
            op.dispose();
            Collections.sort(result, this.tupleComparator);
            for (int i = 0; i < 1000; ++i) {
                Assert.assertEquals((long)i, (long)((Integer)((Tuple2)result.get((int)i)).f0).intValue());
                Assert.assertEquals((long)i, (long)((Integer)((Tuple2)result.get((int)i)).f1).intValue());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            timerService.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTumblingWindowDuplicateElements() {
        ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
        try {
            int windowSize = 50;
            CollectingOutput out = new CollectingOutput(50);
            Object lock = new Object();
            StreamTask<?, ?> mockTask = AggregatingAlignedProcessingTimeWindowOperatorTest.createMockTaskWithTimer(timerService, lock);
            AggregatingProcessingTimeWindowOperator op = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, (TypeSerializer)IntSerializer.INSTANCE, this.tupleSerializer, 50L, 50L);
            op.setup(mockTask, new StreamConfig(new Configuration()), out);
            op.open();
            int numWindows = 10;
            long previousNextTime = 0L;
            int window = 1;
            while (window <= 10) {
                Object object = lock;
                synchronized (object) {
                    long nextTime = op.getNextEvaluationTime();
                    int val = (int)nextTime ^ (int)(nextTime >>> 32);
                    StreamRecord next = new StreamRecord((Object)new Tuple2((Object)val, (Object)val));
                    op.setKeyContextElement1(next);
                    op.processElement(next);
                    if (nextTime != previousNextTime) {
                        ++window;
                        previousNextTime = nextTime;
                    }
                }
                Thread.sleep(1L);
            }
            out.waitForNElements(10, 60000L);
            List result = out.getElements();
            Object object = lock;
            synchronized (object) {
                op.close();
            }
            op.dispose();
            Assert.assertTrue((result.size() >= 10 && result.size() <= 20 ? 1 : 0) != 0);
            HashSet set = new HashSet(result);
            Assert.assertTrue((set.size() == 10 ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            timerService.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlidingWindow() {
        ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
        try {
            CollectingOutput out = new CollectingOutput(50);
            Object lock = new Object();
            StreamTask<?, ?> mockTask = AggregatingAlignedProcessingTimeWindowOperatorTest.createMockTaskWithTimer(timerService, lock);
            AggregatingProcessingTimeWindowOperator op = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, (TypeSerializer)IntSerializer.INSTANCE, this.tupleSerializer, 150L, 50L);
            op.setup(mockTask, new StreamConfig(new Configuration()), out);
            op.open();
            int numElements = 1000;
            for (int i = 0; i < 1000; ++i) {
                Object object = lock;
                synchronized (object) {
                    StreamRecord next = new StreamRecord((Object)new Tuple2((Object)i, (Object)i));
                    op.setKeyContextElement1(next);
                    op.processElement(next);
                }
                Thread.sleep(1L);
            }
            Object i = lock;
            synchronized (i) {
                op.close();
            }
            op.dispose();
            List result = out.getElements();
            if (result.size() < 1000 || result.size() > 3000) {
                System.out.println(result);
                Assert.fail((String)("Wrong number of results: " + result.size()));
            }
            Collections.sort(result, this.tupleComparator);
            int lastNum = -1;
            int lastCount = -1;
            for (Tuple2 val : result) {
                Assert.assertEquals((Object)val.f0, (Object)val.f1);
                if ((Integer)val.f0 == lastNum) {
                    Assert.assertTrue((++lastCount <= 3 ? 1 : 0) != 0);
                    continue;
                }
                lastNum = (Integer)val.f0;
                lastCount = 1;
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            timerService.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlidingWindowSingleElements() {
        ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
        try {
            CollectingOutput out = new CollectingOutput(50);
            Object lock = new Object();
            StreamTask<?, ?> mockTask = AggregatingAlignedProcessingTimeWindowOperatorTest.createMockTaskWithTimer(timerService, lock);
            AggregatingProcessingTimeWindowOperator op = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, (TypeSerializer)IntSerializer.INSTANCE, this.tupleSerializer, 150L, 50L);
            op.setup(mockTask, new StreamConfig(new Configuration()), out);
            op.open();
            Object object = lock;
            synchronized (object) {
                StreamRecord next1 = new StreamRecord((Object)new Tuple2((Object)1, (Object)1));
                op.setKeyContextElement1(next1);
                op.processElement(next1);
                StreamRecord next2 = new StreamRecord((Object)new Tuple2((Object)2, (Object)2));
                op.setKeyContextElement1(next2);
                op.processElement(next2);
            }
            out.waitForNElements(6, 120000L);
            List result = out.getElements();
            Assert.assertEquals((long)6L, (long)result.size());
            Collections.sort(result, this.tupleComparator);
            Assert.assertEquals(Arrays.asList(new Tuple2((Object)1, (Object)1), new Tuple2((Object)1, (Object)1), new Tuple2((Object)1, (Object)1), new Tuple2((Object)2, (Object)2), new Tuple2((Object)2, (Object)2), new Tuple2((Object)2, (Object)2)), result);
            Object object2 = lock;
            synchronized (object2) {
                op.close();
            }
            op.dispose();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            timerService.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPropagateExceptionsFromProcessElement() {
        ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
        try {
            CollectingOutput out = new CollectingOutput();
            Object lock = new Object();
            StreamTask<?, ?> mockTask = AggregatingAlignedProcessingTimeWindowOperatorTest.createMockTaskWithTimer(timerService, lock);
            FailingFunction failingFunction = new FailingFunction(100);
            long hundredYears = 3153600000000L;
            AggregatingProcessingTimeWindowOperator op = new AggregatingProcessingTimeWindowOperator((ReduceFunction)failingFunction, this.fieldOneSelector, (TypeSerializer)IntSerializer.INSTANCE, this.tupleSerializer, 3153600000000L, 3153600000000L);
            op.setup(mockTask, new StreamConfig(new Configuration()), out);
            op.open();
            for (int i = 0; i < 100; ++i) {
                Object object = lock;
                synchronized (object) {
                    StreamRecord next = new StreamRecord((Object)new Tuple2((Object)1, (Object)1));
                    op.setKeyContextElement1(next);
                    op.processElement(next);
                    continue;
                }
            }
            try {
                StreamRecord next = new StreamRecord((Object)new Tuple2((Object)1, (Object)1));
                op.setKeyContextElement1(next);
                op.processElement(next);
                Assert.fail((String)"This fail with an exception");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)e.getMessage().contains("Artificial Test Exception"));
            }
            op.dispose();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            timerService.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void checkpointRestoreWithPendingWindowTumbling() {
        ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
        try {
            ArrayList resultAtSnapshot;
            StreamTaskState state;
            Object next;
            int windowSize = 200;
            CollectingOutput out = new CollectingOutput(200);
            Object lock = new Object();
            StreamTask<?, ?> mockTask = AggregatingAlignedProcessingTimeWindowOperatorTest.createMockTaskWithTimer(timerService, lock);
            AggregatingProcessingTimeWindowOperator op = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, (TypeSerializer)IntSerializer.INSTANCE, this.tupleSerializer, 200L, 200L);
            op.setup(mockTask, new StreamConfig(new Configuration()), out);
            op.open();
            int numElementsFirst = 700;
            int numElements = 1000;
            for (int i = 0; i < 700; ++i) {
                Object object = lock;
                synchronized (object) {
                    next = new StreamRecord((Object)new Tuple2((Object)i, (Object)i));
                    op.setKeyContextElement1((StreamRecord)next);
                    op.processElement((StreamRecord)next);
                }
                Thread.sleep(1L);
            }
            next = lock;
            synchronized (next) {
                int beforeSnapShot = out.getElements().size();
                state = op.snapshotOperatorState(1L, System.currentTimeMillis());
                resultAtSnapshot = new ArrayList(out.getElements());
                int afterSnapShot = out.getElements().size();
                Assert.assertEquals((String)"operator performed computation during snapshot", (long)beforeSnapShot, (long)afterSnapShot);
            }
            Assert.assertTrue((resultAtSnapshot.size() <= 700 ? 1 : 0) != 0);
            for (int i = 700; i < 1000; ++i) {
                Object beforeSnapShot = lock;
                synchronized (beforeSnapShot) {
                    StreamRecord next2 = new StreamRecord((Object)new Tuple2((Object)i, (Object)i));
                    op.setKeyContextElement1(next2);
                    op.processElement(next2);
                }
                Thread.sleep(1L);
            }
            op.dispose();
            CollectingOutput out2 = new CollectingOutput(200);
            op = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, (TypeSerializer)IntSerializer.INSTANCE, this.tupleSerializer, 200L, 200L);
            op.setup(mockTask, new StreamConfig(new Configuration()), out2);
            op.restoreState(state);
            op.open();
            for (int i = 700; i < 1000; ++i) {
                Object next2 = lock;
                synchronized (next2) {
                    StreamRecord next3 = new StreamRecord((Object)new Tuple2((Object)i, (Object)i));
                    op.setKeyContextElement1(next3);
                    op.processElement(next3);
                }
                Thread.sleep(1L);
            }
            out2.waitForNElements(1000 - resultAtSnapshot.size(), 60000L);
            ArrayList finalResult = new ArrayList(resultAtSnapshot);
            finalResult.addAll(out2.getElements());
            Assert.assertEquals((long)1000L, (long)finalResult.size());
            Object next2 = lock;
            synchronized (next2) {
                op.close();
            }
            op.dispose();
            Collections.sort(finalResult, this.tupleComparator);
            for (int i = 0; i < 1000; ++i) {
                Assert.assertEquals((long)i, (long)((Integer)((Tuple2)finalResult.get((int)i)).f0).intValue());
                Assert.assertEquals((long)i, (long)((Integer)((Tuple2)finalResult.get((int)i)).f1).intValue());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            timerService.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void checkpointRestoreWithPendingWindowSliding() {
        ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
        try {
            Object next;
            ArrayList resultAtSnapshot;
            StreamTaskState state;
            Object next2;
            int factor = 4;
            int windowSlide = 50;
            int windowSize = 200;
            CollectingOutput out = new CollectingOutput(50);
            Object lock = new Object();
            StreamTask<?, ?> mockTask = AggregatingAlignedProcessingTimeWindowOperatorTest.createMockTaskWithTimer(timerService, lock);
            AggregatingProcessingTimeWindowOperator op = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, (TypeSerializer)IntSerializer.INSTANCE, this.tupleSerializer, 200L, 50L);
            op.setup(mockTask, new StreamConfig(new Configuration()), out);
            op.open();
            int numElements = 1000;
            int numElementsFirst = 700;
            for (int i = 0; i < 700; ++i) {
                Object object = lock;
                synchronized (object) {
                    next2 = new StreamRecord((Object)new Tuple2((Object)i, (Object)i));
                    op.setKeyContextElement1((StreamRecord)next2);
                    op.processElement((StreamRecord)next2);
                }
                Thread.sleep(1L);
            }
            next2 = lock;
            synchronized (next2) {
                int beforeSnapShot = out.getElements().size();
                state = op.snapshotOperatorState(1L, System.currentTimeMillis());
                resultAtSnapshot = new ArrayList(out.getElements());
                int afterSnapShot = out.getElements().size();
                Assert.assertEquals((String)"operator performed computation during snapshot", (long)beforeSnapShot, (long)afterSnapShot);
            }
            Assert.assertTrue((resultAtSnapshot.size() <= 2800 ? 1 : 0) != 0);
            for (int i = 700; i < 1000; ++i) {
                Object beforeSnapShot = lock;
                synchronized (beforeSnapShot) {
                    StreamRecord next3 = new StreamRecord((Object)new Tuple2((Object)i, (Object)i));
                    op.setKeyContextElement1(next3);
                    op.processElement(next3);
                }
                Thread.sleep(1L);
            }
            op.dispose();
            CollectingOutput out2 = new CollectingOutput(50);
            op = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, (TypeSerializer)IntSerializer.INSTANCE, this.tupleSerializer, 200L, 50L);
            op.setup(mockTask, new StreamConfig(new Configuration()), out2);
            op.restoreState(state);
            op.open();
            for (int i = 700; i < 1000; ++i) {
                Object object = lock;
                synchronized (object) {
                    next = new StreamRecord((Object)new Tuple2((Object)i, (Object)i));
                    op.setKeyContextElement1(next);
                    op.processElement(next);
                }
                Thread.sleep(1L);
            }
            long deadline = System.currentTimeMillis() + 120000L;
            do {
                Thread.sleep(20L);
            } while (resultAtSnapshot.size() + out2.getElements().size() < 4000 && System.currentTimeMillis() < deadline);
            next = lock;
            synchronized (next) {
                op.close();
            }
            op.dispose();
            ArrayList finalResult = new ArrayList(resultAtSnapshot);
            finalResult.addAll(out2.getElements());
            Assert.assertEquals((long)4000L, (long)finalResult.size());
            Collections.sort(finalResult, this.tupleComparator);
            for (int i = 0; i < 4000; ++i) {
                Assert.assertEquals((long)(i / 4), (long)((Integer)((Tuple2)finalResult.get((int)i)).f0).intValue());
                Assert.assertEquals((long)(i / 4), (long)((Integer)((Tuple2)finalResult.get((int)i)).f1).intValue());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            timerService.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKeyValueStateInWindowFunctionTumbling() {
        ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
        try {
            long twoSeconds = 2000L;
            CollectingOutput out = new CollectingOutput();
            Object lock = new Object();
            StreamTask<?, ?> mockTask = AggregatingAlignedProcessingTimeWindowOperatorTest.createMockTaskWithTimer(timerService, lock);
            StatefulFunction.globalCounts.clear();
            AggregatingProcessingTimeWindowOperator op = new AggregatingProcessingTimeWindowOperator((ReduceFunction)new StatefulFunction(), this.fieldOneSelector, (TypeSerializer)IntSerializer.INSTANCE, this.tupleSerializer, 2000L, 2000L);
            op.setup(mockTask, AggregatingAlignedProcessingTimeWindowOperatorTest.createTaskConfig(this.fieldOneSelector, IntSerializer.INSTANCE), out);
            op.open();
            Object object = lock;
            synchronized (object) {
                for (int i = 0; i < 10; ++i) {
                    StreamRecord next1 = new StreamRecord((Object)new Tuple2((Object)1, (Object)i));
                    op.setKeyContextElement1(next1);
                    op.processElement(next1);
                    StreamRecord next2 = new StreamRecord((Object)new Tuple2((Object)2, (Object)i));
                    op.setKeyContextElement1(next2);
                    op.processElement(next2);
                }
            }
            while (StatefulFunction.globalCounts.get(1) < 10 || StatefulFunction.globalCounts.get(2) < 10) {
                Thread.sleep(50L);
            }
            op.close();
            op.dispose();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            timerService.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKeyValueStateInWindowFunctionSliding() {
        ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
        try {
            int factor = 2;
            int windowSlide = 50;
            int windowSize = 100;
            CollectingOutput out = new CollectingOutput();
            Object lock = new Object();
            StreamTask<?, ?> mockTask = AggregatingAlignedProcessingTimeWindowOperatorTest.createMockTaskWithTimer(timerService, lock);
            StatefulFunction.globalCounts.clear();
            AggregatingProcessingTimeWindowOperator op = new AggregatingProcessingTimeWindowOperator((ReduceFunction)new StatefulFunction(), this.fieldOneSelector, (TypeSerializer)IntSerializer.INSTANCE, this.tupleSerializer, 100L, 50L);
            op.setup(mockTask, AggregatingAlignedProcessingTimeWindowOperatorTest.createTaskConfig(this.fieldOneSelector, IntSerializer.INSTANCE), out);
            op.open();
            int numElements = 100;
            for (int i = 0; i < 100; ++i) {
                StreamRecord next1 = new StreamRecord((Object)new Tuple2((Object)1, (Object)i));
                StreamRecord next2 = new StreamRecord((Object)new Tuple2((Object)2, (Object)i));
                StreamRecord next3 = new StreamRecord((Object)new Tuple2((Object)1, (Object)i));
                StreamRecord next4 = new StreamRecord((Object)new Tuple2((Object)2, (Object)i));
                Object object = lock;
                synchronized (object) {
                    op.setKeyContextElement1(next1);
                    op.processElement(next1);
                    op.setKeyContextElement1(next2);
                    op.processElement(next2);
                    op.setKeyContextElement1(next3);
                    op.processElement(next3);
                    op.setKeyContextElement1(next4);
                    op.processElement(next4);
                }
                Thread.sleep(1L);
            }
            Object i = lock;
            synchronized (i) {
                op.close();
            }
            int count1 = StatefulFunction.globalCounts.get(1);
            int count2 = StatefulFunction.globalCounts.get(2);
            Assert.assertTrue((count1 >= 2 && count1 <= 200 ? 1 : 0) != 0);
            Assert.assertEquals((long)count1, (long)count2);
            op.dispose();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            timerService.shutdown();
        }
    }

    private void assertInvalidParameter(long windowSize, long windowSlide) {
        try {
            new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, windowSize, windowSlide);
            Assert.fail((String)"This should fail with an IllegalArgumentException");
        }
        catch (IllegalArgumentException illegalArgumentException) {
        }
        catch (Exception e) {
            Assert.fail((String)("Wrong exception. Expected IllegalArgumentException but found " + e.getClass().getSimpleName()));
        }
    }

    private static StreamTask<?, ?> createMockTask() {
        StreamTask task = (StreamTask)Mockito.mock(StreamTask.class);
        Mockito.when((Object)task.getAccumulatorMap()).thenReturn(new HashMap());
        Mockito.when((Object)task.getName()).thenReturn((Object)"Test task name");
        Mockito.when((Object)task.getExecutionConfig()).thenReturn((Object)new ExecutionConfig());
        DummyEnvironment env = new DummyEnvironment("Test task name", 1, 0);
        Mockito.when((Object)task.getEnvironment()).thenReturn((Object)env);
        try {
            ((StreamTask)Mockito.doAnswer((Answer)new Answer<AbstractStateBackend>((Environment)env){
                final /* synthetic */ Environment val$env;
                {
                    this.val$env = environment;
                }

                public AbstractStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
                    String operatorIdentifier = (String)invocationOnMock.getArguments()[0];
                    TypeSerializer keySerializer = (TypeSerializer)invocationOnMock.getArguments()[1];
                    MemoryStateBackend backend = MemoryStateBackend.create();
                    backend.initializeForJob(this.val$env, operatorIdentifier, keySerializer);
                    return backend;
                }
            }).when((Object)task)).createStateBackend((String)Matchers.any(String.class), (TypeSerializer)Matchers.any(TypeSerializer.class));
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return task;
    }

    private static StreamTask<?, ?> createMockTaskWithTimer(final ScheduledExecutorService timerService, final Object lock) {
        StreamTask<?, ?> mockTask = AggregatingAlignedProcessingTimeWindowOperatorTest.createMockTask();
        ((StreamTask)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
                final Long timestamp = (Long)invocationOnMock.getArguments()[0];
                final Triggerable target = (Triggerable)invocationOnMock.getArguments()[1];
                timerService.schedule(new Callable<Object>(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public Object call() throws Exception {
                        Object object = lock;
                        synchronized (object) {
                            target.trigger(timestamp.longValue());
                        }
                        return null;
                    }
                }, timestamp - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                return null;
            }
        }).when(mockTask)).registerTimer(Matchers.anyLong(), (Triggerable)Matchers.any(Triggerable.class));
        return mockTask;
    }

    private static StreamConfig createTaskConfig(KeySelector<?, ?> partitioner, TypeSerializer<?> keySerializer) {
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setStatePartitioner(0, partitioner);
        cfg.setStateKeySerializer(keySerializer);
        return cfg;
    }

    private static class StatefulFunction
    extends RichReduceFunction<Tuple2<Integer, Integer>> {
        static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<Integer, Integer>();
        private ValueState<Integer> state;

        private StatefulFunction() {
        }

        public void open(Configuration parameters) {
            Assert.assertNotNull((Object)this.getRuntimeContext());
            this.state = this.getRuntimeContext().getState(new ValueStateDescriptor("totalCount", Integer.class, (Object)1));
        }

        public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
            this.state.update((Object)((Integer)this.state.value() + 1));
            globalCounts.put((Integer)value1.f0, (Integer)this.state.value());
            return new Tuple2(value1.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1));
        }
    }

    private static class FailingFunction
    implements ReduceFunction<Tuple2<Integer, Integer>> {
        private final int failAfterElements;
        private int numElements;

        FailingFunction(int failAfterElements) {
            this.failAfterElements = failAfterElements;
        }

        public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
            ++this.numElements;
            if (this.numElements >= this.failAfterElements) {
                throw new Exception("Artificial Test Exception");
            }
            return new Tuple2(value1.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1));
        }
    }
}

