package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.class */
public class AggregatingAlignedProcessingTimeWindowOperatorTest {
    private final ReduceFunction<String> mockFunction = (ReduceFunction) Mockito.mock(ReduceFunction.class);
    private final KeySelector<String, String> mockKeySelector = (KeySelector) Mockito.mock(KeySelector.class);
    private final KeySelector<Tuple2<Integer, Integer>, Integer> fieldOneSelector = new KeySelector<Tuple2<Integer, Integer>, Integer>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AggregatingAlignedProcessingTimeWindowOperatorTest.1
        public Integer getKey(Tuple2<Integer, Integer> tuple2) {
            return (Integer) tuple2.f0;
        }
    };
    private final ReduceFunction<Tuple2<Integer, Integer>> sumFunction = new ReduceFunction<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AggregatingAlignedProcessingTimeWindowOperatorTest.2
        public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> tuple2, Tuple2<Integer, Integer> tuple22) {
            return new Tuple2<>(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
        }
    };
    private final TypeSerializer<Tuple2<Integer, Integer>> tupleSerializer = new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}).createSerializer(new ExecutionConfig());
    private final Comparator<Tuple2<Integer, Integer>> tupleComparator = new Comparator<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AggregatingAlignedProcessingTimeWindowOperatorTest.3
        @Override // java.util.Comparator
        public int compare(Tuple2<Integer, Integer> tuple2, Tuple2<Integer, Integer> tuple22) {
            int intValue = ((Integer) tuple2.f0).intValue() - ((Integer) tuple22.f0).intValue();
            return intValue != 0 ? intValue : ((Integer) tuple2.f1).intValue() - ((Integer) tuple22.f1).intValue();
        }
    };

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest$FailingFunction.class */
    private static class FailingFunction implements ReduceFunction<Tuple2<Integer, Integer>> {
        private final int failAfterElements;
        private int numElements;

        FailingFunction(int i) {
            this.failAfterElements = i;
        }

        public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> tuple2, Tuple2<Integer, Integer> tuple22) throws Exception {
            this.numElements++;
            if (this.numElements >= this.failAfterElements) {
                throw new Exception("Artificial Test Exception");
            }
            return new Tuple2<>(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest$StatefulFunction.class */
    private static class StatefulFunction extends RichReduceFunction<Tuple2<Integer, Integer>> {
        static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap();
        private ValueState<Integer> state;

        private StatefulFunction() {
        }

        public void open(Configuration configuration) {
            Assert.assertNotNull(getRuntimeContext());
            this.state = getRuntimeContext().getState(new ValueStateDescriptor("totalCount", Integer.class, 1));
        }

        /* JADX WARN: Multi-variable type inference failed */
        public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> tuple2, Tuple2<Integer, Integer> tuple22) throws Exception {
            this.state.update(Integer.valueOf(((Integer) this.state.value()).intValue() + 1));
            globalCounts.put(tuple2.f0, this.state.value());
            return new Tuple2<>(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
        }
    }

    public AggregatingAlignedProcessingTimeWindowOperatorTest() {
        ClosureCleaner.clean(this.fieldOneSelector, false);
        ClosureCleaner.clean(this.sumFunction, false);
    }

    @After
    public void checkNoTriggerThreadsRunning() {
        long currentTimeMillis = System.currentTimeMillis() + 5000;
        while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < currentTimeMillis) {
            try {
                Thread.sleep(10L);
            } catch (InterruptedException e) {
            }
        }
        Assert.assertTrue("Not all trigger threads where properly shut down", StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0);
    }

    @Test
    public void testInvalidParameters() {
        try {
            assertInvalidParameter(-1L, -1L);
            assertInvalidParameter(10000L, -1L);
            assertInvalidParameter(-1L, 1000L);
            assertInvalidParameter(1000L, 2000L);
            assertInvalidParameter(1000L, 999L);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testWindowSizeAndSlide() {
        try {
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000L, 1000L);
            Assert.assertEquals(5000L, aggregatingProcessingTimeWindowOperator.getWindowSize());
            Assert.assertEquals(1000L, aggregatingProcessingTimeWindowOperator.getWindowSlide());
            Assert.assertEquals(1000L, aggregatingProcessingTimeWindowOperator.getPaneSize());
            Assert.assertEquals(5L, aggregatingProcessingTimeWindowOperator.getNumPanesPerWindow());
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator2 = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000L, 1000L);
            Assert.assertEquals(1000L, aggregatingProcessingTimeWindowOperator2.getWindowSize());
            Assert.assertEquals(1000L, aggregatingProcessingTimeWindowOperator2.getWindowSlide());
            Assert.assertEquals(1000L, aggregatingProcessingTimeWindowOperator2.getPaneSize());
            Assert.assertEquals(1L, aggregatingProcessingTimeWindowOperator2.getNumPanesPerWindow());
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator3 = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500L, 1000L);
            Assert.assertEquals(1500L, aggregatingProcessingTimeWindowOperator3.getWindowSize());
            Assert.assertEquals(1000L, aggregatingProcessingTimeWindowOperator3.getWindowSlide());
            Assert.assertEquals(500L, aggregatingProcessingTimeWindowOperator3.getPaneSize());
            Assert.assertEquals(3L, aggregatingProcessingTimeWindowOperator3.getNumPanesPerWindow());
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator4 = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200L, 1100L);
            Assert.assertEquals(1200L, aggregatingProcessingTimeWindowOperator4.getWindowSize());
            Assert.assertEquals(1100L, aggregatingProcessingTimeWindowOperator4.getWindowSlide());
            Assert.assertEquals(100L, aggregatingProcessingTimeWindowOperator4.getPaneSize());
            Assert.assertEquals(12L, aggregatingProcessingTimeWindowOperator4.getNumPanesPerWindow());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testWindowTriggerTimeAlignment() {
        try {
            Output output = (Output) Mockito.mock(Output.class);
            StreamTask<?, ?> createMockTask = createMockTask();
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000L, 1000L);
            aggregatingProcessingTimeWindowOperator.setup(createMockTask, new StreamConfig(new Configuration()), output);
            aggregatingProcessingTimeWindowOperator.open();
            Assert.assertTrue(aggregatingProcessingTimeWindowOperator.getNextSlideTime() % 1000 == 0);
            Assert.assertTrue(aggregatingProcessingTimeWindowOperator.getNextEvaluationTime() % 1000 == 0);
            aggregatingProcessingTimeWindowOperator.dispose();
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator2 = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000L, 1000L);
            aggregatingProcessingTimeWindowOperator2.setup(createMockTask, new StreamConfig(new Configuration()), output);
            aggregatingProcessingTimeWindowOperator2.open();
            Assert.assertTrue(aggregatingProcessingTimeWindowOperator2.getNextSlideTime() % 1000 == 0);
            Assert.assertTrue(aggregatingProcessingTimeWindowOperator2.getNextEvaluationTime() % 1000 == 0);
            aggregatingProcessingTimeWindowOperator2.dispose();
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator3 = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500L, 1000L);
            aggregatingProcessingTimeWindowOperator3.setup(createMockTask, new StreamConfig(new Configuration()), output);
            aggregatingProcessingTimeWindowOperator3.open();
            Assert.assertTrue(aggregatingProcessingTimeWindowOperator3.getNextSlideTime() % 500 == 0);
            Assert.assertTrue(aggregatingProcessingTimeWindowOperator3.getNextEvaluationTime() % 1000 == 0);
            aggregatingProcessingTimeWindowOperator3.dispose();
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator4 = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200L, 1100L);
            aggregatingProcessingTimeWindowOperator4.setup(createMockTask, new StreamConfig(new Configuration()), output);
            aggregatingProcessingTimeWindowOperator4.open();
            Assert.assertTrue(aggregatingProcessingTimeWindowOperator4.getNextSlideTime() % 100 == 0);
            Assert.assertTrue(aggregatingProcessingTimeWindowOperator4.getNextEvaluationTime() % 1100 == 0);
            aggregatingProcessingTimeWindowOperator4.dispose();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTumblingWindowUniqueElements() {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                CollectingOutput collectingOutput = new CollectingOutput(50);
                AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 50L, 50L);
                Object obj = new Object();
                aggregatingProcessingTimeWindowOperator.setup(createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj), new StreamConfig(new Configuration()), collectingOutput);
                aggregatingProcessingTimeWindowOperator.open();
                for (int i = 0; i < 1000; i++) {
                    synchronized (obj) {
                        StreamRecord streamRecord = new StreamRecord(new Tuple2(Integer.valueOf(i), Integer.valueOf(i)));
                        aggregatingProcessingTimeWindowOperator.setKeyContextElement1(streamRecord);
                        aggregatingProcessingTimeWindowOperator.processElement(streamRecord);
                    }
                    Thread.sleep(1L);
                }
                collectingOutput.waitForNElements(1000, 60000L);
                List elements = collectingOutput.getElements();
                Assert.assertEquals(1000L, elements.size());
                synchronized (obj) {
                    aggregatingProcessingTimeWindowOperator.close();
                }
                aggregatingProcessingTimeWindowOperator.dispose();
                Collections.sort(elements, this.tupleComparator);
                for (int i2 = 0; i2 < 1000; i2++) {
                    Assert.assertEquals(i2, ((Integer) ((Tuple2) elements.get(i2)).f0).intValue());
                    Assert.assertEquals(i2, ((Integer) ((Tuple2) elements.get(i2)).f1).intValue());
                }
                newSingleThreadScheduledExecutor.shutdownNow();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                newSingleThreadScheduledExecutor.shutdownNow();
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testTumblingWindowDuplicateElements() {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                CollectingOutput collectingOutput = new CollectingOutput(50);
                Object obj = new Object();
                StreamTask<?, ?> createMockTaskWithTimer = createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj);
                AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 50L, 50L);
                aggregatingProcessingTimeWindowOperator.setup(createMockTaskWithTimer, new StreamConfig(new Configuration()), collectingOutput);
                aggregatingProcessingTimeWindowOperator.open();
                long j = 0;
                int i = 1;
                while (i <= 10) {
                    synchronized (obj) {
                        long nextEvaluationTime = aggregatingProcessingTimeWindowOperator.getNextEvaluationTime();
                        int i2 = ((int) nextEvaluationTime) ^ ((int) (nextEvaluationTime >>> 32));
                        StreamRecord streamRecord = new StreamRecord(new Tuple2(Integer.valueOf(i2), Integer.valueOf(i2)));
                        aggregatingProcessingTimeWindowOperator.setKeyContextElement1(streamRecord);
                        aggregatingProcessingTimeWindowOperator.processElement(streamRecord);
                        if (nextEvaluationTime != j) {
                            i++;
                            j = nextEvaluationTime;
                        }
                    }
                    Thread.sleep(1L);
                }
                collectingOutput.waitForNElements(10, 60000L);
                List elements = collectingOutput.getElements();
                synchronized (obj) {
                    aggregatingProcessingTimeWindowOperator.close();
                }
                aggregatingProcessingTimeWindowOperator.dispose();
                Assert.assertTrue(elements.size() >= 10 && elements.size() <= 20);
                Assert.assertTrue(new HashSet(elements).size() == 10);
                newSingleThreadScheduledExecutor.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                newSingleThreadScheduledExecutor.shutdown();
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void testSlidingWindow() {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                CollectingOutput collectingOutput = new CollectingOutput(50);
                Object obj = new Object();
                StreamTask<?, ?> createMockTaskWithTimer = createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj);
                AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 150L, 50L);
                aggregatingProcessingTimeWindowOperator.setup(createMockTaskWithTimer, new StreamConfig(new Configuration()), collectingOutput);
                aggregatingProcessingTimeWindowOperator.open();
                for (int i = 0; i < 1000; i++) {
                    synchronized (obj) {
                        StreamRecord streamRecord = new StreamRecord(new Tuple2(Integer.valueOf(i), Integer.valueOf(i)));
                        aggregatingProcessingTimeWindowOperator.setKeyContextElement1(streamRecord);
                        aggregatingProcessingTimeWindowOperator.processElement(streamRecord);
                    }
                    Thread.sleep(1L);
                }
                synchronized (obj) {
                    aggregatingProcessingTimeWindowOperator.close();
                }
                aggregatingProcessingTimeWindowOperator.dispose();
                List<Tuple2> elements = collectingOutput.getElements();
                if (elements.size() < 1000 || elements.size() > 3000) {
                    System.out.println(elements);
                    Assert.fail("Wrong number of results: " + elements.size());
                }
                Collections.sort(elements, this.tupleComparator);
                int i2 = -1;
                int i3 = -1;
                for (Tuple2 tuple2 : elements) {
                    Assert.assertEquals(tuple2.f0, tuple2.f1);
                    if (((Integer) tuple2.f0).intValue() == i2) {
                        i3++;
                        Assert.assertTrue(i3 <= 3);
                    } else {
                        i2 = ((Integer) tuple2.f0).intValue();
                        i3 = 1;
                    }
                }
                newSingleThreadScheduledExecutor.shutdownNow();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                newSingleThreadScheduledExecutor.shutdownNow();
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testSlidingWindowSingleElements() {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                CollectingOutput collectingOutput = new CollectingOutput(50);
                Object obj = new Object();
                StreamTask<?, ?> createMockTaskWithTimer = createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj);
                AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 150L, 50L);
                aggregatingProcessingTimeWindowOperator.setup(createMockTaskWithTimer, new StreamConfig(new Configuration()), collectingOutput);
                aggregatingProcessingTimeWindowOperator.open();
                synchronized (obj) {
                    StreamRecord streamRecord = new StreamRecord(new Tuple2(1, 1));
                    aggregatingProcessingTimeWindowOperator.setKeyContextElement1(streamRecord);
                    aggregatingProcessingTimeWindowOperator.processElement(streamRecord);
                    StreamRecord streamRecord2 = new StreamRecord(new Tuple2(2, 2));
                    aggregatingProcessingTimeWindowOperator.setKeyContextElement1(streamRecord2);
                    aggregatingProcessingTimeWindowOperator.processElement(streamRecord2);
                }
                collectingOutput.waitForNElements(6, 120000L);
                List elements = collectingOutput.getElements();
                Assert.assertEquals(6L, elements.size());
                Collections.sort(elements, this.tupleComparator);
                Assert.assertEquals(Arrays.asList(new Tuple2(1, 1), new Tuple2(1, 1), new Tuple2(1, 1), new Tuple2(2, 2), new Tuple2(2, 2), new Tuple2(2, 2)), elements);
                synchronized (obj) {
                    aggregatingProcessingTimeWindowOperator.close();
                }
                aggregatingProcessingTimeWindowOperator.dispose();
                newSingleThreadScheduledExecutor.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                newSingleThreadScheduledExecutor.shutdown();
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void testPropagateExceptionsFromProcessElement() {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                CollectingOutput collectingOutput = new CollectingOutput();
                Object obj = new Object();
                StreamTask<?, ?> createMockTaskWithTimer = createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj);
                AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(new FailingFunction(100), this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 3153600000000L, 3153600000000L);
                aggregatingProcessingTimeWindowOperator.setup(createMockTaskWithTimer, new StreamConfig(new Configuration()), collectingOutput);
                aggregatingProcessingTimeWindowOperator.open();
                for (int i = 0; i < 100; i++) {
                    synchronized (obj) {
                        StreamRecord streamRecord = new StreamRecord(new Tuple2(1, 1));
                        aggregatingProcessingTimeWindowOperator.setKeyContextElement1(streamRecord);
                        aggregatingProcessingTimeWindowOperator.processElement(streamRecord);
                    }
                }
                try {
                    StreamRecord streamRecord2 = new StreamRecord(new Tuple2(1, 1));
                    aggregatingProcessingTimeWindowOperator.setKeyContextElement1(streamRecord2);
                    aggregatingProcessingTimeWindowOperator.processElement(streamRecord2);
                    Assert.fail("This fail with an exception");
                } catch (Exception e) {
                    Assert.assertTrue(e.getMessage().contains("Artificial Test Exception"));
                }
                aggregatingProcessingTimeWindowOperator.dispose();
                newSingleThreadScheduledExecutor.shutdown();
            } catch (Exception e2) {
                e2.printStackTrace();
                Assert.fail(e2.getMessage());
                newSingleThreadScheduledExecutor.shutdown();
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void checkpointRestoreWithPendingWindowTumbling() {
        StreamTaskState snapshotOperatorState;
        ArrayList arrayList;
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                CollectingOutput collectingOutput = new CollectingOutput(200);
                Object obj = new Object();
                StreamTask<?, ?> createMockTaskWithTimer = createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj);
                AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 200L, 200L);
                aggregatingProcessingTimeWindowOperator.setup(createMockTaskWithTimer, new StreamConfig(new Configuration()), collectingOutput);
                aggregatingProcessingTimeWindowOperator.open();
                for (int i = 0; i < 700; i++) {
                    synchronized (obj) {
                        StreamRecord streamRecord = new StreamRecord(new Tuple2(Integer.valueOf(i), Integer.valueOf(i)));
                        aggregatingProcessingTimeWindowOperator.setKeyContextElement1(streamRecord);
                        aggregatingProcessingTimeWindowOperator.processElement(streamRecord);
                    }
                    Thread.sleep(1L);
                }
                synchronized (obj) {
                    int size = collectingOutput.getElements().size();
                    snapshotOperatorState = aggregatingProcessingTimeWindowOperator.snapshotOperatorState(1L, System.currentTimeMillis());
                    arrayList = new ArrayList(collectingOutput.getElements());
                    Assert.assertEquals("operator performed computation during snapshot", size, collectingOutput.getElements().size());
                }
                Assert.assertTrue(arrayList.size() <= 700);
                for (int i2 = 700; i2 < 1000; i2++) {
                    synchronized (obj) {
                        StreamRecord streamRecord2 = new StreamRecord(new Tuple2(Integer.valueOf(i2), Integer.valueOf(i2)));
                        aggregatingProcessingTimeWindowOperator.setKeyContextElement1(streamRecord2);
                        aggregatingProcessingTimeWindowOperator.processElement(streamRecord2);
                    }
                    Thread.sleep(1L);
                }
                aggregatingProcessingTimeWindowOperator.dispose();
                CollectingOutput collectingOutput2 = new CollectingOutput(200);
                AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator2 = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 200L, 200L);
                aggregatingProcessingTimeWindowOperator2.setup(createMockTaskWithTimer, new StreamConfig(new Configuration()), collectingOutput2);
                aggregatingProcessingTimeWindowOperator2.restoreState(snapshotOperatorState, 1L);
                aggregatingProcessingTimeWindowOperator2.open();
                for (int i3 = 700; i3 < 1000; i3++) {
                    synchronized (obj) {
                        StreamRecord streamRecord3 = new StreamRecord(new Tuple2(Integer.valueOf(i3), Integer.valueOf(i3)));
                        aggregatingProcessingTimeWindowOperator2.setKeyContextElement1(streamRecord3);
                        aggregatingProcessingTimeWindowOperator2.processElement(streamRecord3);
                    }
                    Thread.sleep(1L);
                }
                collectingOutput2.waitForNElements(1000 - arrayList.size(), 60000L);
                ArrayList arrayList2 = new ArrayList(arrayList);
                arrayList2.addAll(collectingOutput2.getElements());
                Assert.assertEquals(1000L, arrayList2.size());
                synchronized (obj) {
                    aggregatingProcessingTimeWindowOperator2.close();
                }
                aggregatingProcessingTimeWindowOperator2.dispose();
                Collections.sort(arrayList2, this.tupleComparator);
                for (int i4 = 0; i4 < 1000; i4++) {
                    Assert.assertEquals(i4, ((Integer) ((Tuple2) arrayList2.get(i4)).f0).intValue());
                    Assert.assertEquals(i4, ((Integer) ((Tuple2) arrayList2.get(i4)).f1).intValue());
                }
                newSingleThreadScheduledExecutor.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                newSingleThreadScheduledExecutor.shutdown();
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void checkpointRestoreWithPendingWindowSliding() {
        StreamTaskState snapshotOperatorState;
        ArrayList arrayList;
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                CollectingOutput collectingOutput = new CollectingOutput(50);
                Object obj = new Object();
                StreamTask<?, ?> createMockTaskWithTimer = createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj);
                AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 200L, 50L);
                aggregatingProcessingTimeWindowOperator.setup(createMockTaskWithTimer, new StreamConfig(new Configuration()), collectingOutput);
                aggregatingProcessingTimeWindowOperator.open();
                for (int i = 0; i < 700; i++) {
                    synchronized (obj) {
                        StreamRecord streamRecord = new StreamRecord(new Tuple2(Integer.valueOf(i), Integer.valueOf(i)));
                        aggregatingProcessingTimeWindowOperator.setKeyContextElement1(streamRecord);
                        aggregatingProcessingTimeWindowOperator.processElement(streamRecord);
                    }
                    Thread.sleep(1L);
                }
                synchronized (obj) {
                    int size = collectingOutput.getElements().size();
                    snapshotOperatorState = aggregatingProcessingTimeWindowOperator.snapshotOperatorState(1L, System.currentTimeMillis());
                    arrayList = new ArrayList(collectingOutput.getElements());
                    Assert.assertEquals("operator performed computation during snapshot", size, collectingOutput.getElements().size());
                }
                Assert.assertTrue(arrayList.size() <= 2800);
                for (int i2 = 700; i2 < 1000; i2++) {
                    synchronized (obj) {
                        StreamRecord streamRecord2 = new StreamRecord(new Tuple2(Integer.valueOf(i2), Integer.valueOf(i2)));
                        aggregatingProcessingTimeWindowOperator.setKeyContextElement1(streamRecord2);
                        aggregatingProcessingTimeWindowOperator.processElement(streamRecord2);
                    }
                    Thread.sleep(1L);
                }
                aggregatingProcessingTimeWindowOperator.dispose();
                CollectingOutput collectingOutput2 = new CollectingOutput(50);
                AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator2 = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 200L, 50L);
                aggregatingProcessingTimeWindowOperator2.setup(createMockTaskWithTimer, new StreamConfig(new Configuration()), collectingOutput2);
                aggregatingProcessingTimeWindowOperator2.restoreState(snapshotOperatorState, 1L);
                aggregatingProcessingTimeWindowOperator2.open();
                for (int i3 = 700; i3 < 1000; i3++) {
                    synchronized (obj) {
                        StreamRecord streamRecord3 = new StreamRecord(new Tuple2(Integer.valueOf(i3), Integer.valueOf(i3)));
                        aggregatingProcessingTimeWindowOperator2.setKeyContextElement1(streamRecord3);
                        aggregatingProcessingTimeWindowOperator2.processElement(streamRecord3);
                    }
                    Thread.sleep(1L);
                }
                long currentTimeMillis = System.currentTimeMillis() + 120000;
                do {
                    Thread.sleep(20L);
                    if (arrayList.size() + collectingOutput2.getElements().size() >= 4000) {
                        break;
                    }
                } while (System.currentTimeMillis() < currentTimeMillis);
                synchronized (obj) {
                    aggregatingProcessingTimeWindowOperator2.close();
                }
                aggregatingProcessingTimeWindowOperator2.dispose();
                ArrayList arrayList2 = new ArrayList(arrayList);
                arrayList2.addAll(collectingOutput2.getElements());
                Assert.assertEquals(4000L, arrayList2.size());
                Collections.sort(arrayList2, this.tupleComparator);
                for (int i4 = 0; i4 < 4000; i4++) {
                    Assert.assertEquals(i4 / 4, ((Integer) ((Tuple2) arrayList2.get(i4)).f0).intValue());
                    Assert.assertEquals(i4 / 4, ((Integer) ((Tuple2) arrayList2.get(i4)).f1).intValue());
                }
                newSingleThreadScheduledExecutor.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                newSingleThreadScheduledExecutor.shutdown();
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void testKeyValueStateInWindowFunctionTumbling() {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                CollectingOutput collectingOutput = new CollectingOutput();
                Object obj = new Object();
                StreamTask<?, ?> createMockTaskWithTimer = createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj);
                StatefulFunction.globalCounts.clear();
                AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(new StatefulFunction(), this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 2000L, 2000L);
                aggregatingProcessingTimeWindowOperator.setup(createMockTaskWithTimer, createTaskConfig(this.fieldOneSelector, IntSerializer.INSTANCE), collectingOutput);
                aggregatingProcessingTimeWindowOperator.open();
                synchronized (obj) {
                    for (int i = 0; i < 10; i++) {
                        StreamRecord streamRecord = new StreamRecord(new Tuple2(1, Integer.valueOf(i)));
                        aggregatingProcessingTimeWindowOperator.setKeyContextElement1(streamRecord);
                        aggregatingProcessingTimeWindowOperator.processElement(streamRecord);
                        StreamRecord streamRecord2 = new StreamRecord(new Tuple2(2, Integer.valueOf(i)));
                        aggregatingProcessingTimeWindowOperator.setKeyContextElement1(streamRecord2);
                        aggregatingProcessingTimeWindowOperator.processElement(streamRecord2);
                    }
                }
                while (true) {
                    if (StatefulFunction.globalCounts.get(1).intValue() >= 10 && StatefulFunction.globalCounts.get(2).intValue() >= 10) {
                        aggregatingProcessingTimeWindowOperator.close();
                        aggregatingProcessingTimeWindowOperator.dispose();
                        newSingleThreadScheduledExecutor.shutdown();
                        return;
                    }
                    Thread.sleep(50L);
                }
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                newSingleThreadScheduledExecutor.shutdown();
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void testKeyValueStateInWindowFunctionSliding() {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                CollectingOutput collectingOutput = new CollectingOutput();
                Object obj = new Object();
                StreamTask<?, ?> createMockTaskWithTimer = createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj);
                StatefulFunction.globalCounts.clear();
                AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(new StatefulFunction(), this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 100L, 50L);
                aggregatingProcessingTimeWindowOperator.setup(createMockTaskWithTimer, createTaskConfig(this.fieldOneSelector, IntSerializer.INSTANCE), collectingOutput);
                aggregatingProcessingTimeWindowOperator.open();
                for (int i = 0; i < 100; i++) {
                    StreamRecord streamRecord = new StreamRecord(new Tuple2(1, Integer.valueOf(i)));
                    StreamRecord streamRecord2 = new StreamRecord(new Tuple2(2, Integer.valueOf(i)));
                    StreamRecord streamRecord3 = new StreamRecord(new Tuple2(1, Integer.valueOf(i)));
                    StreamRecord streamRecord4 = new StreamRecord(new Tuple2(2, Integer.valueOf(i)));
                    synchronized (obj) {
                        aggregatingProcessingTimeWindowOperator.setKeyContextElement1(streamRecord);
                        aggregatingProcessingTimeWindowOperator.processElement(streamRecord);
                        aggregatingProcessingTimeWindowOperator.setKeyContextElement1(streamRecord2);
                        aggregatingProcessingTimeWindowOperator.processElement(streamRecord2);
                        aggregatingProcessingTimeWindowOperator.setKeyContextElement1(streamRecord3);
                        aggregatingProcessingTimeWindowOperator.processElement(streamRecord3);
                        aggregatingProcessingTimeWindowOperator.setKeyContextElement1(streamRecord4);
                        aggregatingProcessingTimeWindowOperator.processElement(streamRecord4);
                    }
                    Thread.sleep(1L);
                }
                synchronized (obj) {
                    aggregatingProcessingTimeWindowOperator.close();
                }
                int intValue = StatefulFunction.globalCounts.get(1).intValue();
                int intValue2 = StatefulFunction.globalCounts.get(2).intValue();
                Assert.assertTrue(intValue >= 2 && intValue <= 200);
                Assert.assertEquals(intValue, intValue2);
                aggregatingProcessingTimeWindowOperator.dispose();
                newSingleThreadScheduledExecutor.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                newSingleThreadScheduledExecutor.shutdown();
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    private void assertInvalidParameter(long j, long j2) {
        try {
            new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, j, j2);
            Assert.fail("This should fail with an IllegalArgumentException");
        } catch (IllegalArgumentException e) {
        } catch (Exception e2) {
            Assert.fail("Wrong exception. Expected IllegalArgumentException but found " + e2.getClass().getSimpleName());
        }
    }

    private static StreamTask<?, ?> createMockTask() {
        StreamTask<?, ?> streamTask = (StreamTask) Mockito.mock(StreamTask.class);
        Mockito.when(streamTask.getAccumulatorMap()).thenReturn(new HashMap());
        Mockito.when(streamTask.getName()).thenReturn("Test task name");
        Mockito.when(streamTask.getExecutionConfig()).thenReturn(new ExecutionConfig());
        final Environment environment = (Environment) Mockito.mock(Environment.class);
        Mockito.when(environment.getTaskInfo()).thenReturn(new TaskInfo("Test task name", 0, 1, 0));
        Mockito.when(environment.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader());
        Mockito.when(streamTask.getEnvironment()).thenReturn(environment);
        try {
            ((StreamTask) Mockito.doAnswer(new Answer<AbstractStateBackend>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AggregatingAlignedProcessingTimeWindowOperatorTest.4
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public AbstractStateBackend m26answer(InvocationOnMock invocationOnMock) throws Throwable {
                    String str = (String) invocationOnMock.getArguments()[0];
                    TypeSerializer typeSerializer = (TypeSerializer) invocationOnMock.getArguments()[1];
                    MemoryStateBackend create = MemoryStateBackend.create();
                    create.initializeForJob(environment, str, typeSerializer);
                    return create;
                }
            }).when(streamTask)).createStateBackend((String) Matchers.any(String.class), (TypeSerializer) Matchers.any(TypeSerializer.class));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return streamTask;
    }

    private static StreamTask<?, ?> createMockTaskWithTimer(final ScheduledExecutorService scheduledExecutorService, final Object obj) {
        StreamTask<?, ?> createMockTask = createMockTask();
        ((StreamTask) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AggregatingAlignedProcessingTimeWindowOperatorTest.5
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m27answer(InvocationOnMock invocationOnMock) throws Throwable {
                final Long l = (Long) invocationOnMock.getArguments()[0];
                final Triggerable triggerable = (Triggerable) invocationOnMock.getArguments()[1];
                scheduledExecutorService.schedule(new Callable<Object>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AggregatingAlignedProcessingTimeWindowOperatorTest.5.1
                    @Override // java.util.concurrent.Callable
                    public Object call() throws Exception {
                        synchronized (obj) {
                            triggerable.trigger(l.longValue());
                        }
                        return null;
                    }
                }, l.longValue() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                return null;
            }
        }).when(createMockTask)).registerTimer(Matchers.anyLong(), (Triggerable) Matchers.any(Triggerable.class));
        return createMockTask;
    }

    private static StreamConfig createTaskConfig(KeySelector<?, ?> keySelector, TypeSerializer<?> typeSerializer) {
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        streamConfig.setStatePartitioner(0, keySelector);
        streamConfig.setStateKeySerializer(typeSerializer);
        return streamConfig;
    }
}
