/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.AggregatingAlignedProcessingTimeWindowOperatorTest;
import org.apache.flink.streaming.runtime.operators.windowing.CollectingOutput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.Collector;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
    private final WindowFunction<String, String, String, TimeWindow> mockFunction = (WindowFunction)Mockito.mock(WindowFunction.class);
    private final KeySelector<String, String> mockKeySelector = (KeySelector)Mockito.mock(KeySelector.class);
    private final KeySelector<Integer, Integer> identitySelector = new KeySelector<Integer, Integer>(){

        public Integer getKey(Integer value) {
            return value;
        }
    };
    private final WindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction = new WindowFunction<Integer, Integer, Integer, TimeWindow>(){

        public void apply(Integer key, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) {
            for (Integer val : values) {
                Assert.assertEquals((Object)key, (Object)val);
                out.collect((Object)val);
            }
        }
    };

    public AccumulatingAlignedProcessingTimeWindowOperatorTest() {
        ClosureCleaner.clean(this.identitySelector, (boolean)false);
        ClosureCleaner.clean(this.validatingIdentityFunction, (boolean)false);
    }

    @After
    public void checkNoTriggerThreadsRunning() {
        long deadline = System.currentTimeMillis() + 5000L;
        while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException interruptedException) {}
        }
        Assert.assertTrue((String)"Not all trigger threads where properly shut down", (StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0 ? 1 : 0) != 0);
    }

    @Test
    public void testInvalidParameters() {
        try {
            this.assertInvalidParameter(-1L, -1L);
            this.assertInvalidParameter(10000L, -1L);
            this.assertInvalidParameter(-1L, 1000L);
            this.assertInvalidParameter(1000L, 2000L);
            this.assertInvalidParameter(1000L, 999L);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testWindowSizeAndSlide() {
        try {
            AccumulatingProcessingTimeWindowOperator op = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, 5000L, 1000L);
            Assert.assertEquals((long)5000L, (long)op.getWindowSize());
            Assert.assertEquals((long)1000L, (long)op.getWindowSlide());
            Assert.assertEquals((long)1000L, (long)op.getPaneSize());
            Assert.assertEquals((long)5L, (long)op.getNumPanesPerWindow());
            op = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, 1000L, 1000L);
            Assert.assertEquals((long)1000L, (long)op.getWindowSize());
            Assert.assertEquals((long)1000L, (long)op.getWindowSlide());
            Assert.assertEquals((long)1000L, (long)op.getPaneSize());
            Assert.assertEquals((long)1L, (long)op.getNumPanesPerWindow());
            op = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, 1500L, 1000L);
            Assert.assertEquals((long)1500L, (long)op.getWindowSize());
            Assert.assertEquals((long)1000L, (long)op.getWindowSlide());
            Assert.assertEquals((long)500L, (long)op.getPaneSize());
            Assert.assertEquals((long)3L, (long)op.getNumPanesPerWindow());
            op = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, 1200L, 1100L);
            Assert.assertEquals((long)1200L, (long)op.getWindowSize());
            Assert.assertEquals((long)1100L, (long)op.getWindowSlide());
            Assert.assertEquals((long)100L, (long)op.getPaneSize());
            Assert.assertEquals((long)12L, (long)op.getNumPanesPerWindow());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testWindowTriggerTimeAlignment() {
        try {
            Output mockOut = (Output)Mockito.mock(Output.class);
            StreamTask<?, ?> mockTask = AccumulatingAlignedProcessingTimeWindowOperatorTest.createMockTask();
            AccumulatingProcessingTimeWindowOperator op = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, 5000L, 1000L);
            op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
            op.open();
            Assert.assertTrue((op.getNextSlideTime() % 1000L == 0L ? 1 : 0) != 0);
            Assert.assertTrue((op.getNextEvaluationTime() % 1000L == 0L ? 1 : 0) != 0);
            op.dispose();
            op = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, 1000L, 1000L);
            op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
            op.open();
            Assert.assertTrue((op.getNextSlideTime() % 1000L == 0L ? 1 : 0) != 0);
            Assert.assertTrue((op.getNextEvaluationTime() % 1000L == 0L ? 1 : 0) != 0);
            op.dispose();
            op = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, 1500L, 1000L);
            op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
            op.open();
            Assert.assertTrue((op.getNextSlideTime() % 500L == 0L ? 1 : 0) != 0);
            Assert.assertTrue((op.getNextEvaluationTime() % 1000L == 0L ? 1 : 0) != 0);
            op.dispose();
            op = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, 1200L, 1100L);
            op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
            op.open();
            Assert.assertTrue((op.getNextSlideTime() % 100L == 0L ? 1 : 0) != 0);
            Assert.assertTrue((op.getNextEvaluationTime() % 1100L == 0L ? 1 : 0) != 0);
            op.dispose();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTumblingWindow() {
        ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
        try {
            int windowSize = 50;
            CollectingOutput out = new CollectingOutput(50);
            Object lock = new Object();
            StreamTask<?, ?> mockTask = AccumulatingAlignedProcessingTimeWindowOperatorTest.createMockTaskWithTimer(timerService, lock);
            AccumulatingProcessingTimeWindowOperator op = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE, 50L, 50L);
            op.setup(mockTask, new StreamConfig(new Configuration()), out);
            op.open();
            int numElements = 1000;
            for (int i = 0; i < 1000; ++i) {
                Object object = lock;
                synchronized (object) {
                    op.processElement(new StreamRecord((Object)i));
                }
                Thread.sleep(1L);
            }
            out.waitForNElements(1000, 60000L);
            Object i = lock;
            synchronized (i) {
                op.close();
            }
            op.dispose();
            List result = out.getElements();
            Assert.assertEquals((long)1000L, (long)result.size());
            Collections.sort(result);
            for (int i2 = 0; i2 < 1000; ++i2) {
                Assert.assertEquals((long)i2, (long)((Integer)result.get(i2)).intValue());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            timerService.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlidingWindow() {
        ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
        try {
            CollectingOutput out = new CollectingOutput(50);
            Object lock = new Object();
            StreamTask<?, ?> mockTask = AccumulatingAlignedProcessingTimeWindowOperatorTest.createMockTaskWithTimer(timerService, lock);
            AccumulatingProcessingTimeWindowOperator op = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE, 150L, 50L);
            op.setup(mockTask, new StreamConfig(new Configuration()), out);
            op.open();
            int numElements = 1000;
            for (int i = 0; i < 1000; ++i) {
                Object object = lock;
                synchronized (object) {
                    op.processElement(new StreamRecord((Object)i));
                }
                Thread.sleep(1L);
            }
            Object i = lock;
            synchronized (i) {
                op.close();
            }
            op.dispose();
            List result = out.getElements();
            if (result.size() < 1000 || result.size() > 3000) {
                Assert.fail((String)("Wrong number of results: " + result.size()));
            }
            Collections.sort(result);
            int lastNum = -1;
            int lastCount = -1;
            Iterator iterator = result.iterator();
            while (iterator.hasNext()) {
                int num = (Integer)iterator.next();
                if (num == lastNum) {
                    Assert.assertTrue((++lastCount <= 3 ? 1 : 0) != 0);
                    continue;
                }
                lastNum = num;
                lastCount = 1;
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            timerService.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTumblingWindowSingleElements() {
        ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
        try {
            CollectingOutput out = new CollectingOutput(50);
            Object lock = new Object();
            StreamTask<?, ?> mockTask = AccumulatingAlignedProcessingTimeWindowOperatorTest.createMockTaskWithTimer(timerService, lock);
            AccumulatingProcessingTimeWindowOperator op = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE, 50L, 50L);
            op.setup(mockTask, new StreamConfig(new Configuration()), out);
            op.open();
            Object object = lock;
            synchronized (object) {
                op.processElement(new StreamRecord((Object)1));
                op.processElement(new StreamRecord((Object)2));
            }
            out.waitForNElements(2, 60000L);
            object = lock;
            synchronized (object) {
                op.processElement(new StreamRecord((Object)3));
                op.processElement(new StreamRecord((Object)4));
                op.processElement(new StreamRecord((Object)5));
            }
            out.waitForNElements(5, 60000L);
            object = lock;
            synchronized (object) {
                op.processElement(new StreamRecord((Object)6));
            }
            out.waitForNElements(6, 60000L);
            List result = out.getElements();
            Assert.assertEquals((long)6L, (long)result.size());
            Collections.sort(result);
            Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result);
            Object object2 = lock;
            synchronized (object2) {
                op.close();
            }
            op.dispose();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            timerService.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlidingWindowSingleElements() {
        ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
        try {
            CollectingOutput out = new CollectingOutput(50);
            Object lock = new Object();
            StreamTask<?, ?> mockTask = AccumulatingAlignedProcessingTimeWindowOperatorTest.createMockTaskWithTimer(timerService, lock);
            AccumulatingProcessingTimeWindowOperator op = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE, 150L, 50L);
            op.setup(mockTask, new StreamConfig(new Configuration()), out);
            op.open();
            Object object = lock;
            synchronized (object) {
                op.processElement(new StreamRecord((Object)1));
                op.processElement(new StreamRecord((Object)2));
            }
            out.waitForNElements(6, 120000L);
            List result = out.getElements();
            Assert.assertEquals((long)6L, (long)result.size());
            Collections.sort(result);
            Assert.assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
            Object object2 = lock;
            synchronized (object2) {
                op.close();
            }
            op.dispose();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            timerService.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void checkpointRestoreWithPendingWindowTumbling() {
        ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
        try {
            ArrayList resultAtSnapshot;
            StreamTaskState state;
            int windowSize = 200;
            CollectingOutput out = new CollectingOutput(200);
            Object lock = new Object();
            StreamTask<?, ?> mockTask = AccumulatingAlignedProcessingTimeWindowOperatorTest.createMockTaskWithTimer(timerService, lock);
            AccumulatingProcessingTimeWindowOperator op = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE, 200L, 200L);
            op.setup(mockTask, new StreamConfig(new Configuration()), out);
            op.open();
            int numElementsFirst = 700;
            int numElements = 1000;
            for (int i = 0; i < 700; ++i) {
                Object object = lock;
                synchronized (object) {
                    op.processElement(new StreamRecord((Object)i));
                }
                Thread.sleep(1L);
            }
            Object object = lock;
            synchronized (object) {
                int beforeSnapShot = out.getElements().size();
                state = op.snapshotOperatorState(1L, System.currentTimeMillis());
                resultAtSnapshot = new ArrayList(out.getElements());
                int afterSnapShot = out.getElements().size();
                Assert.assertEquals((String)"operator performed computation during snapshot", (long)beforeSnapShot, (long)afterSnapShot);
                Assert.assertTrue((afterSnapShot <= 700 ? 1 : 0) != 0);
            }
            for (int i = 0; i < 300; ++i) {
                Object beforeSnapShot = lock;
                synchronized (beforeSnapShot) {
                    op.processElement(new StreamRecord((Object)(i + 700)));
                }
                Thread.sleep(1L);
            }
            op.dispose();
            CollectingOutput out2 = new CollectingOutput(200);
            op = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE, 200L, 200L);
            op.setup(mockTask, new StreamConfig(new Configuration()), out2);
            op.restoreState(state);
            op.open();
            for (int i = 700; i < 1000; ++i) {
                Object afterSnapShot = lock;
                synchronized (afterSnapShot) {
                    op.processElement(new StreamRecord((Object)i));
                }
                Thread.sleep(1L);
            }
            out2.waitForNElements(1000 - resultAtSnapshot.size(), 60000L);
            ArrayList finalResult = new ArrayList(resultAtSnapshot);
            finalResult.addAll(out2.getElements());
            Assert.assertEquals((long)1000L, (long)finalResult.size());
            Collections.sort(finalResult);
            for (int i = 0; i < 1000; ++i) {
                Assert.assertEquals((long)i, (long)((Integer)finalResult.get(i)).intValue());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            timerService.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void checkpointRestoreWithPendingWindowSliding() {
        ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
        try {
            ArrayList resultAtSnapshot;
            StreamTaskState state;
            int factor = 4;
            int windowSlide = 50;
            int windowSize = 200;
            CollectingOutput out = new CollectingOutput(50);
            Object lock = new Object();
            StreamTask<?, ?> mockTask = AccumulatingAlignedProcessingTimeWindowOperatorTest.createMockTaskWithTimer(timerService, lock);
            AccumulatingProcessingTimeWindowOperator op = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE, 200L, 50L);
            op.setup(mockTask, new StreamConfig(new Configuration()), out);
            op.open();
            int numElements = 1000;
            int numElementsFirst = 700;
            for (int i = 0; i < 700; ++i) {
                Object object = lock;
                synchronized (object) {
                    op.processElement(new StreamRecord((Object)i));
                }
                Thread.sleep(1L);
            }
            Object object = lock;
            synchronized (object) {
                int beforeSnapShot = out.getElements().size();
                state = op.snapshotOperatorState(1L, System.currentTimeMillis());
                resultAtSnapshot = new ArrayList(out.getElements());
                int afterSnapShot = out.getElements().size();
                Assert.assertEquals((String)"operator performed computation during snapshot", (long)beforeSnapShot, (long)afterSnapShot);
            }
            Assert.assertTrue((resultAtSnapshot.size() <= 2800 ? 1 : 0) != 0);
            for (int i = 700; i < 1000; ++i) {
                Object beforeSnapShot = lock;
                synchronized (beforeSnapShot) {
                    op.processElement(new StreamRecord((Object)i));
                }
                Thread.sleep(1L);
            }
            op.dispose();
            CollectingOutput out2 = new CollectingOutput(50);
            op = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE, 200L, 50L);
            op.setup(mockTask, new StreamConfig(new Configuration()), out2);
            op.restoreState(state);
            op.open();
            for (int i = 700; i < 1000; ++i) {
                Object object2 = lock;
                synchronized (object2) {
                    op.processElement(new StreamRecord((Object)i));
                }
                Thread.sleep(1L);
            }
            long deadline = System.currentTimeMillis() + 120000L;
            do {
                Thread.sleep(20L);
            } while (resultAtSnapshot.size() + out2.getElements().size() < 4000 && System.currentTimeMillis() < deadline);
            Object object3 = lock;
            synchronized (object3) {
                op.close();
            }
            op.dispose();
            ArrayList finalResult = new ArrayList(resultAtSnapshot);
            finalResult.addAll(out2.getElements());
            Assert.assertEquals((long)4000L, (long)finalResult.size());
            Collections.sort(finalResult);
            for (int i = 0; i < 4000; ++i) {
                Assert.assertEquals((long)(i / 4), (long)((Integer)finalResult.get(i)).intValue());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            timerService.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKeyValueStateInWindowFunction() {
        ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
        try {
            CollectingOutput out = new CollectingOutput(50);
            Object lock = new Object();
            StreamTask<?, ?> mockTask = AccumulatingAlignedProcessingTimeWindowOperatorTest.createMockTaskWithTimer(timerService, lock);
            StatefulFunction.globalCounts.clear();
            AccumulatingProcessingTimeWindowOperator op = new AccumulatingProcessingTimeWindowOperator((WindowFunction)new StatefulFunction(), this.identitySelector, (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE, 50L, 50L);
            op.setup(mockTask, AccumulatingAlignedProcessingTimeWindowOperatorTest.createTaskConfig(this.identitySelector, IntSerializer.INSTANCE), out);
            op.open();
            Object object = lock;
            synchronized (object) {
                op.processElement(new StreamRecord((Object)1));
                op.processElement(new StreamRecord((Object)2));
            }
            out.waitForNElements(2, 60000L);
            object = lock;
            synchronized (object) {
                op.processElement(new StreamRecord((Object)1));
                op.processElement(new StreamRecord((Object)2));
                op.processElement(new StreamRecord((Object)1));
                op.processElement(new StreamRecord((Object)1));
                op.processElement(new StreamRecord((Object)2));
                op.processElement(new StreamRecord((Object)2));
            }
            out.waitForNElements(8, 60000L);
            List result = out.getElements();
            Assert.assertEquals((long)8L, (long)result.size());
            Collections.sort(result);
            Assert.assertEquals(Arrays.asList(1, 1, 1, 1, 2, 2, 2, 2), result);
            Assert.assertEquals((long)4L, (long)StatefulFunction.globalCounts.get(1).intValue());
            Assert.assertEquals((long)4L, (long)StatefulFunction.globalCounts.get(2).intValue());
            Object object2 = lock;
            synchronized (object2) {
                op.close();
            }
            op.dispose();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            timerService.shutdown();
        }
    }

    private void assertInvalidParameter(long windowSize, long windowSlide) {
        try {
            new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, windowSize, windowSlide);
            Assert.fail((String)"This should fail with an IllegalArgumentException");
        }
        catch (IllegalArgumentException illegalArgumentException) {
        }
        catch (Exception e) {
            Assert.fail((String)("Wrong exception. Expected IllegalArgumentException but found " + e.getClass().getSimpleName()));
        }
    }

    private static StreamTask<?, ?> createMockTask() {
        StreamTask task = (StreamTask)Mockito.mock(StreamTask.class);
        Mockito.when((Object)task.getAccumulatorMap()).thenReturn(new HashMap());
        Mockito.when((Object)task.getName()).thenReturn((Object)"Test task name");
        Mockito.when((Object)task.getExecutionConfig()).thenReturn((Object)new ExecutionConfig());
        final Environment env = (Environment)Mockito.mock(Environment.class);
        Mockito.when((Object)env.getTaskInfo()).thenReturn((Object)new TaskInfo("Test task name", 0, 1, 0));
        Mockito.when((Object)env.getUserClassLoader()).thenReturn((Object)AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader());
        Mockito.when((Object)env.getMetricGroup()).thenReturn((Object)new UnregisteredTaskMetricsGroup());
        Mockito.when((Object)task.getEnvironment()).thenReturn((Object)env);
        try {
            ((StreamTask)Mockito.doAnswer((Answer)new Answer<AbstractStateBackend>(){

                public AbstractStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
                    String operatorIdentifier = (String)invocationOnMock.getArguments()[0];
                    TypeSerializer keySerializer = (TypeSerializer)invocationOnMock.getArguments()[1];
                    MemoryStateBackend backend = MemoryStateBackend.create();
                    backend.initializeForJob(env, operatorIdentifier, keySerializer);
                    return backend;
                }
            }).when((Object)task)).createStateBackend((String)Mockito.any(String.class), (TypeSerializer)Mockito.any(TypeSerializer.class));
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return task;
    }

    private static StreamTask<?, ?> createMockTaskWithTimer(final ScheduledExecutorService timerService, final Object lock) {
        StreamTask<?, ?> mockTask = AccumulatingAlignedProcessingTimeWindowOperatorTest.createMockTask();
        ((StreamTask)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
                final Long timestamp = (Long)invocationOnMock.getArguments()[0];
                final Triggerable target = (Triggerable)invocationOnMock.getArguments()[1];
                timerService.schedule(new Callable<Object>(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public Object call() throws Exception {
                        Object object = lock;
                        synchronized (object) {
                            target.trigger(timestamp.longValue());
                        }
                        return null;
                    }
                }, timestamp - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                return null;
            }
        }).when(mockTask)).registerTimer(Mockito.anyLong(), (Triggerable)Mockito.any(Triggerable.class));
        return mockTask;
    }

    private static StreamConfig createTaskConfig(KeySelector<?, ?> partitioner, TypeSerializer<?> keySerializer) {
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setStatePartitioner(0, partitioner);
        cfg.setStateKeySerializer(keySerializer);
        return cfg;
    }

    private static class StatefulFunction
    extends RichWindowFunction<Integer, Integer, Integer, TimeWindow> {
        static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<Integer, Integer>();
        private ValueState<Integer> state;

        private StatefulFunction() {
        }

        public void open(Configuration parameters) {
            Assert.assertNotNull((Object)this.getRuntimeContext());
            this.state = this.getRuntimeContext().getState(new ValueStateDescriptor("totalCount", Integer.class, (Object)0));
        }

        public void apply(Integer key, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
            for (Integer i : values) {
                this.state.update((Object)((Integer)this.state.value() + 1));
                globalCounts.put(key, (Integer)this.state.value());
                out.collect((Object)i);
            }
        }
    }

    private static class FailingFunction
    implements WindowFunction<Integer, Integer, Integer, TimeWindow> {
        private final int failAfterElements;
        private int numElements;

        FailingFunction(int failAfterElements) {
            this.failAfterElements = failAfterElements;
        }

        public void apply(Integer integer, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
            for (Integer i : values) {
                out.collect((Object)i);
                ++this.numElements;
                if (this.numElements < this.failAfterElements) continue;
                throw new Exception("Artificial Test Exception");
            }
        }
    }
}

