package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.Collector;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.class */
public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
    private final WindowFunction<String, String, String, TimeWindow> mockFunction = (WindowFunction) Mockito.mock(WindowFunction.class);
    private final KeySelector<String, String> mockKeySelector = (KeySelector) Mockito.mock(KeySelector.class);
    private final KeySelector<Integer, Integer> identitySelector = new KeySelector<Integer, Integer>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AccumulatingAlignedProcessingTimeWindowOperatorTest.1
        public Integer getKey(Integer num) {
            return num;
        }
    };
    private final WindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction = new WindowFunction<Integer, Integer, Integer, TimeWindow>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AccumulatingAlignedProcessingTimeWindowOperatorTest.2
        public void apply(Integer num, TimeWindow timeWindow, Iterable<Integer> iterable, Collector<Integer> collector) {
            for (Integer num2 : iterable) {
                Assert.assertEquals(num, num2);
                collector.collect(num2);
            }
        }

        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((Integer) obj, (TimeWindow) window, (Iterable<Integer>) iterable, (Collector<Integer>) collector);
        }
    };

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest$FailingFunction.class */
    private static class FailingFunction implements WindowFunction<Integer, Integer, Integer, TimeWindow> {
        private final int failAfterElements;
        private int numElements;

        FailingFunction(int i) {
            this.failAfterElements = i;
        }

        public void apply(Integer num, TimeWindow timeWindow, Iterable<Integer> iterable, Collector<Integer> collector) throws Exception {
            Iterator<Integer> it = iterable.iterator();
            while (it.hasNext()) {
                collector.collect(it.next());
                this.numElements++;
                if (this.numElements >= this.failAfterElements) {
                    throw new Exception("Artificial Test Exception");
                }
            }
        }

        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((Integer) obj, (TimeWindow) window, (Iterable<Integer>) iterable, (Collector<Integer>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest$StatefulFunction.class */
    private static class StatefulFunction extends RichWindowFunction<Integer, Integer, Integer, TimeWindow> {
        static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap();
        private OperatorState<Integer> state;

        private StatefulFunction() {
        }

        public void open(Configuration configuration) {
            Assert.assertNotNull(getRuntimeContext());
            this.state = getRuntimeContext().getKeyValueState("totalCount", Integer.class, 0);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void apply(Integer num, TimeWindow timeWindow, Iterable<Integer> iterable, Collector<Integer> collector) throws Exception {
            for (Integer num2 : iterable) {
                this.state.update(Integer.valueOf(((Integer) this.state.value()).intValue() + 1));
                globalCounts.put(num, this.state.value());
                collector.collect(num2);
            }
        }

        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((Integer) obj, (TimeWindow) window, (Iterable<Integer>) iterable, (Collector<Integer>) collector);
        }
    }

    public AccumulatingAlignedProcessingTimeWindowOperatorTest() {
        ClosureCleaner.clean(this.identitySelector, false);
        ClosureCleaner.clean(this.validatingIdentityFunction, false);
    }

    @After
    public void checkNoTriggerThreadsRunning() {
        long currentTimeMillis = System.currentTimeMillis() + 5000;
        while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < currentTimeMillis) {
            try {
                Thread.sleep(10L);
            } catch (InterruptedException e) {
            }
        }
        Assert.assertTrue("Not all trigger threads where properly shut down", StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0);
    }

    @Test
    public void testInvalidParameters() {
        try {
            assertInvalidParameter(-1L, -1L);
            assertInvalidParameter(10000L, -1L);
            assertInvalidParameter(-1L, 1000L);
            assertInvalidParameter(1000L, 2000L);
            assertInvalidParameter(1000L, 999L);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testWindowSizeAndSlide() {
        try {
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000L, 1000L);
            Assert.assertEquals(5000L, accumulatingProcessingTimeWindowOperator.getWindowSize());
            Assert.assertEquals(1000L, accumulatingProcessingTimeWindowOperator.getWindowSlide());
            Assert.assertEquals(1000L, accumulatingProcessingTimeWindowOperator.getPaneSize());
            Assert.assertEquals(5L, accumulatingProcessingTimeWindowOperator.getNumPanesPerWindow());
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator2 = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000L, 1000L);
            Assert.assertEquals(1000L, accumulatingProcessingTimeWindowOperator2.getWindowSize());
            Assert.assertEquals(1000L, accumulatingProcessingTimeWindowOperator2.getWindowSlide());
            Assert.assertEquals(1000L, accumulatingProcessingTimeWindowOperator2.getPaneSize());
            Assert.assertEquals(1L, accumulatingProcessingTimeWindowOperator2.getNumPanesPerWindow());
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator3 = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500L, 1000L);
            Assert.assertEquals(1500L, accumulatingProcessingTimeWindowOperator3.getWindowSize());
            Assert.assertEquals(1000L, accumulatingProcessingTimeWindowOperator3.getWindowSlide());
            Assert.assertEquals(500L, accumulatingProcessingTimeWindowOperator3.getPaneSize());
            Assert.assertEquals(3L, accumulatingProcessingTimeWindowOperator3.getNumPanesPerWindow());
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator4 = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200L, 1100L);
            Assert.assertEquals(1200L, accumulatingProcessingTimeWindowOperator4.getWindowSize());
            Assert.assertEquals(1100L, accumulatingProcessingTimeWindowOperator4.getWindowSlide());
            Assert.assertEquals(100L, accumulatingProcessingTimeWindowOperator4.getPaneSize());
            Assert.assertEquals(12L, accumulatingProcessingTimeWindowOperator4.getNumPanesPerWindow());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testWindowTriggerTimeAlignment() {
        try {
            Output output = (Output) Mockito.mock(Output.class);
            StreamTask<?, ?> createMockTask = createMockTask();
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000L, 1000L);
            accumulatingProcessingTimeWindowOperator.setup(createMockTask, new StreamConfig(new Configuration()), output);
            accumulatingProcessingTimeWindowOperator.open();
            Assert.assertTrue(accumulatingProcessingTimeWindowOperator.getNextSlideTime() % 1000 == 0);
            Assert.assertTrue(accumulatingProcessingTimeWindowOperator.getNextEvaluationTime() % 1000 == 0);
            accumulatingProcessingTimeWindowOperator.dispose();
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator2 = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000L, 1000L);
            accumulatingProcessingTimeWindowOperator2.setup(createMockTask, new StreamConfig(new Configuration()), output);
            accumulatingProcessingTimeWindowOperator2.open();
            Assert.assertTrue(accumulatingProcessingTimeWindowOperator2.getNextSlideTime() % 1000 == 0);
            Assert.assertTrue(accumulatingProcessingTimeWindowOperator2.getNextEvaluationTime() % 1000 == 0);
            accumulatingProcessingTimeWindowOperator2.dispose();
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator3 = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500L, 1000L);
            accumulatingProcessingTimeWindowOperator3.setup(createMockTask, new StreamConfig(new Configuration()), output);
            accumulatingProcessingTimeWindowOperator3.open();
            Assert.assertTrue(accumulatingProcessingTimeWindowOperator3.getNextSlideTime() % 500 == 0);
            Assert.assertTrue(accumulatingProcessingTimeWindowOperator3.getNextEvaluationTime() % 1000 == 0);
            accumulatingProcessingTimeWindowOperator3.dispose();
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator4 = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200L, 1100L);
            accumulatingProcessingTimeWindowOperator4.setup(createMockTask, new StreamConfig(new Configuration()), output);
            accumulatingProcessingTimeWindowOperator4.open();
            Assert.assertTrue(accumulatingProcessingTimeWindowOperator4.getNextSlideTime() % 100 == 0);
            Assert.assertTrue(accumulatingProcessingTimeWindowOperator4.getNextEvaluationTime() % 1100 == 0);
            accumulatingProcessingTimeWindowOperator4.dispose();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTumblingWindow() {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                CollectingOutput collectingOutput = new CollectingOutput(50);
                Object obj = new Object();
                StreamTask<?, ?> createMockTaskWithTimer = createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj);
                AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50L, 50L);
                accumulatingProcessingTimeWindowOperator.setup(createMockTaskWithTimer, new StreamConfig(new Configuration()), collectingOutput);
                accumulatingProcessingTimeWindowOperator.open();
                for (int i = 0; i < 1000; i++) {
                    synchronized (obj) {
                        accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(Integer.valueOf(i)));
                    }
                    Thread.sleep(1L);
                }
                synchronized (obj) {
                    accumulatingProcessingTimeWindowOperator.close();
                }
                accumulatingProcessingTimeWindowOperator.dispose();
                List elements = collectingOutput.getElements();
                Assert.assertEquals(1000L, elements.size());
                Collections.sort(elements);
                for (int i2 = 0; i2 < 1000; i2++) {
                    Assert.assertEquals(i2, ((Integer) elements.get(i2)).intValue());
                }
                newSingleThreadScheduledExecutor.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                newSingleThreadScheduledExecutor.shutdown();
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void testSlidingWindow() {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                CollectingOutput collectingOutput = new CollectingOutput(50);
                Object obj = new Object();
                StreamTask<?, ?> createMockTaskWithTimer = createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj);
                AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150L, 50L);
                accumulatingProcessingTimeWindowOperator.setup(createMockTaskWithTimer, new StreamConfig(new Configuration()), collectingOutput);
                accumulatingProcessingTimeWindowOperator.open();
                for (int i = 0; i < 1000; i++) {
                    synchronized (obj) {
                        accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(Integer.valueOf(i)));
                    }
                    Thread.sleep(1L);
                }
                synchronized (obj) {
                    accumulatingProcessingTimeWindowOperator.close();
                }
                accumulatingProcessingTimeWindowOperator.dispose();
                List elements = collectingOutput.getElements();
                if (elements.size() < 1000 || elements.size() > 3000) {
                    Assert.fail("Wrong number of results: " + elements.size());
                }
                Collections.sort(elements);
                int i2 = -1;
                int i3 = -1;
                Iterator it = elements.iterator();
                while (it.hasNext()) {
                    int intValue = ((Integer) it.next()).intValue();
                    if (intValue == i2) {
                        i3++;
                        Assert.assertTrue(i3 <= 3);
                    } else {
                        i2 = intValue;
                        i3 = 1;
                    }
                }
                newSingleThreadScheduledExecutor.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                newSingleThreadScheduledExecutor.shutdown();
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void testTumblingWindowSingleElements() {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                CollectingOutput collectingOutput = new CollectingOutput(50);
                Object obj = new Object();
                StreamTask<?, ?> createMockTaskWithTimer = createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj);
                AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50L, 50L);
                accumulatingProcessingTimeWindowOperator.setup(createMockTaskWithTimer, new StreamConfig(new Configuration()), collectingOutput);
                accumulatingProcessingTimeWindowOperator.open();
                synchronized (obj) {
                    accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(1));
                    accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(2));
                }
                collectingOutput.waitForNElements(2, 60000L);
                synchronized (obj) {
                    accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(3));
                    accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(4));
                    accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(5));
                }
                collectingOutput.waitForNElements(5, 60000L);
                synchronized (obj) {
                    accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(6));
                }
                collectingOutput.waitForNElements(6, 60000L);
                List elements = collectingOutput.getElements();
                Assert.assertEquals(6L, elements.size());
                Collections.sort(elements);
                Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), elements);
                synchronized (obj) {
                    accumulatingProcessingTimeWindowOperator.close();
                }
                accumulatingProcessingTimeWindowOperator.dispose();
                newSingleThreadScheduledExecutor.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                newSingleThreadScheduledExecutor.shutdown();
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void testSlidingWindowSingleElements() {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                CollectingOutput collectingOutput = new CollectingOutput(50);
                Object obj = new Object();
                StreamTask<?, ?> createMockTaskWithTimer = createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj);
                AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150L, 50L);
                accumulatingProcessingTimeWindowOperator.setup(createMockTaskWithTimer, new StreamConfig(new Configuration()), collectingOutput);
                accumulatingProcessingTimeWindowOperator.open();
                synchronized (obj) {
                    accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(1));
                    accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(2));
                }
                collectingOutput.waitForNElements(6, 120000L);
                List elements = collectingOutput.getElements();
                Assert.assertEquals(6L, elements.size());
                Collections.sort(elements);
                Assert.assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), elements);
                synchronized (obj) {
                    accumulatingProcessingTimeWindowOperator.close();
                }
                accumulatingProcessingTimeWindowOperator.dispose();
                newSingleThreadScheduledExecutor.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                newSingleThreadScheduledExecutor.shutdown();
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void testEmitTrailingDataOnClose() {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                CollectingOutput collectingOutput = new CollectingOutput();
                Object obj = new Object();
                StreamTask<?, ?> createMockTaskWithTimer = createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj);
                AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 31536000000L, 31536000000L);
                accumulatingProcessingTimeWindowOperator.setup(createMockTaskWithTimer, new StreamConfig(new Configuration()), collectingOutput);
                accumulatingProcessingTimeWindowOperator.open();
                List<Integer> asList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
                for (Integer num : asList) {
                    synchronized (obj) {
                        accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(num));
                    }
                }
                synchronized (obj) {
                    accumulatingProcessingTimeWindowOperator.close();
                }
                accumulatingProcessingTimeWindowOperator.dispose();
                List elements = collectingOutput.getElements();
                Collections.sort(elements);
                Assert.assertEquals(asList, elements);
                newSingleThreadScheduledExecutor.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                newSingleThreadScheduledExecutor.shutdown();
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void testPropagateExceptionsFromClose() {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                CollectingOutput collectingOutput = new CollectingOutput();
                Object obj = new Object();
                StreamTask<?, ?> createMockTaskWithTimer = createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj);
                AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator = new AccumulatingProcessingTimeWindowOperator(new FailingFunction(100), this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 3153600000000L, 3153600000000L);
                accumulatingProcessingTimeWindowOperator.setup(createMockTaskWithTimer, new StreamConfig(new Configuration()), collectingOutput);
                accumulatingProcessingTimeWindowOperator.open();
                for (int i = 0; i < 150; i++) {
                    synchronized (obj) {
                        accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(Integer.valueOf(i)));
                    }
                }
                try {
                    synchronized (obj) {
                        accumulatingProcessingTimeWindowOperator.close();
                    }
                    Assert.fail("This should fail with an exception");
                } catch (Exception e) {
                    Assert.assertTrue(e.getMessage().contains("Artificial Test Exception") || (e.getCause() != null && e.getCause().getMessage().contains("Artificial Test Exception")));
                }
                accumulatingProcessingTimeWindowOperator.dispose();
                newSingleThreadScheduledExecutor.shutdown();
            } catch (Exception e2) {
                e2.printStackTrace();
                Assert.fail(e2.getMessage());
                newSingleThreadScheduledExecutor.shutdown();
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void checkpointRestoreWithPendingWindowTumbling() {
        StreamTaskState snapshotOperatorState;
        ArrayList arrayList;
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                CollectingOutput collectingOutput = new CollectingOutput(200);
                Object obj = new Object();
                StreamTask<?, ?> createMockTaskWithTimer = createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj);
                AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 200L, 200L);
                accumulatingProcessingTimeWindowOperator.setup(createMockTaskWithTimer, new StreamConfig(new Configuration()), collectingOutput);
                accumulatingProcessingTimeWindowOperator.open();
                for (int i = 0; i < 700; i++) {
                    synchronized (obj) {
                        accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(Integer.valueOf(i)));
                    }
                    Thread.sleep(1L);
                }
                synchronized (obj) {
                    int size = collectingOutput.getElements().size();
                    snapshotOperatorState = accumulatingProcessingTimeWindowOperator.snapshotOperatorState(1L, System.currentTimeMillis());
                    arrayList = new ArrayList(collectingOutput.getElements());
                    int size2 = collectingOutput.getElements().size();
                    Assert.assertEquals("operator performed computation during snapshot", size, size2);
                    Assert.assertTrue(size2 <= 700);
                }
                for (int i2 = 0; i2 < 300; i2++) {
                    synchronized (obj) {
                        accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(Integer.valueOf(i2 + 700)));
                    }
                    Thread.sleep(1L);
                }
                accumulatingProcessingTimeWindowOperator.dispose();
                CollectingOutput collectingOutput2 = new CollectingOutput(200);
                AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator2 = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 200L, 200L);
                accumulatingProcessingTimeWindowOperator2.setup(createMockTaskWithTimer, new StreamConfig(new Configuration()), collectingOutput2);
                accumulatingProcessingTimeWindowOperator2.restoreState(snapshotOperatorState);
                accumulatingProcessingTimeWindowOperator2.open();
                for (int i3 = 700; i3 < 1000; i3++) {
                    synchronized (obj) {
                        accumulatingProcessingTimeWindowOperator2.processElement(new StreamRecord(Integer.valueOf(i3)));
                    }
                    Thread.sleep(1L);
                }
                synchronized (obj) {
                    accumulatingProcessingTimeWindowOperator2.close();
                }
                accumulatingProcessingTimeWindowOperator2.dispose();
                ArrayList arrayList2 = new ArrayList(arrayList);
                arrayList2.addAll(collectingOutput2.getElements());
                Assert.assertEquals(1000L, arrayList2.size());
                Collections.sort(arrayList2);
                for (int i4 = 0; i4 < 1000; i4++) {
                    Assert.assertEquals(i4, ((Integer) arrayList2.get(i4)).intValue());
                }
                newSingleThreadScheduledExecutor.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                newSingleThreadScheduledExecutor.shutdown();
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void checkpointRestoreWithPendingWindowSliding() {
        StreamTaskState snapshotOperatorState;
        ArrayList arrayList;
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                CollectingOutput collectingOutput = new CollectingOutput(50);
                Object obj = new Object();
                StreamTask<?, ?> createMockTaskWithTimer = createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj);
                AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 200L, 50L);
                accumulatingProcessingTimeWindowOperator.setup(createMockTaskWithTimer, new StreamConfig(new Configuration()), collectingOutput);
                accumulatingProcessingTimeWindowOperator.open();
                for (int i = 0; i < 700; i++) {
                    synchronized (obj) {
                        accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(Integer.valueOf(i)));
                    }
                    Thread.sleep(1L);
                }
                synchronized (obj) {
                    int size = collectingOutput.getElements().size();
                    snapshotOperatorState = accumulatingProcessingTimeWindowOperator.snapshotOperatorState(1L, System.currentTimeMillis());
                    arrayList = new ArrayList(collectingOutput.getElements());
                    Assert.assertEquals("operator performed computation during snapshot", size, collectingOutput.getElements().size());
                }
                Assert.assertTrue(arrayList.size() <= 2800);
                for (int i2 = 700; i2 < 1000; i2++) {
                    synchronized (obj) {
                        accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(Integer.valueOf(i2)));
                    }
                    Thread.sleep(1L);
                }
                accumulatingProcessingTimeWindowOperator.dispose();
                CollectingOutput collectingOutput2 = new CollectingOutput(50);
                AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator2 = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 200L, 50L);
                accumulatingProcessingTimeWindowOperator2.setup(createMockTaskWithTimer, new StreamConfig(new Configuration()), collectingOutput2);
                accumulatingProcessingTimeWindowOperator2.restoreState(snapshotOperatorState);
                accumulatingProcessingTimeWindowOperator2.open();
                for (int i3 = 700; i3 < 1000; i3++) {
                    synchronized (obj) {
                        accumulatingProcessingTimeWindowOperator2.processElement(new StreamRecord(Integer.valueOf(i3)));
                    }
                    Thread.sleep(1L);
                }
                long currentTimeMillis = System.currentTimeMillis() + 120000;
                do {
                    Thread.sleep(20L);
                    if (arrayList.size() + collectingOutput2.getElements().size() >= 4000) {
                        break;
                    }
                } while (System.currentTimeMillis() < currentTimeMillis);
                synchronized (obj) {
                    accumulatingProcessingTimeWindowOperator2.close();
                }
                accumulatingProcessingTimeWindowOperator2.dispose();
                ArrayList arrayList2 = new ArrayList(arrayList);
                arrayList2.addAll(collectingOutput2.getElements());
                Assert.assertEquals(4000L, arrayList2.size());
                Collections.sort(arrayList2);
                for (int i4 = 0; i4 < 4000; i4++) {
                    Assert.assertEquals(i4 / 4, ((Integer) arrayList2.get(i4)).intValue());
                }
                newSingleThreadScheduledExecutor.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                newSingleThreadScheduledExecutor.shutdown();
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void testKeyValueStateInWindowFunction() {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                CollectingOutput collectingOutput = new CollectingOutput(50);
                Object obj = new Object();
                StreamTask<?, ?> createMockTaskWithTimer = createMockTaskWithTimer(newSingleThreadScheduledExecutor, obj);
                StatefulFunction.globalCounts.clear();
                AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator = new AccumulatingProcessingTimeWindowOperator(new StatefulFunction(), this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50L, 50L);
                accumulatingProcessingTimeWindowOperator.setup(createMockTaskWithTimer, createTaskConfig(this.identitySelector, IntSerializer.INSTANCE), collectingOutput);
                accumulatingProcessingTimeWindowOperator.open();
                synchronized (obj) {
                    accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(1));
                    accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(2));
                }
                collectingOutput.waitForNElements(2, 60000L);
                synchronized (obj) {
                    accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(1));
                    accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(2));
                    accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(1));
                    accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(1));
                    accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(2));
                    accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(2));
                }
                collectingOutput.waitForNElements(8, 60000L);
                List elements = collectingOutput.getElements();
                Assert.assertEquals(8L, elements.size());
                Collections.sort(elements);
                Assert.assertEquals(Arrays.asList(1, 1, 1, 1, 2, 2, 2, 2), elements);
                Assert.assertEquals(4L, StatefulFunction.globalCounts.get(1).intValue());
                Assert.assertEquals(4L, StatefulFunction.globalCounts.get(2).intValue());
                synchronized (obj) {
                    accumulatingProcessingTimeWindowOperator.close();
                }
                accumulatingProcessingTimeWindowOperator.dispose();
                newSingleThreadScheduledExecutor.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                newSingleThreadScheduledExecutor.shutdown();
            }
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    private void assertInvalidParameter(long j, long j2) {
        try {
            new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, j, j2);
            Assert.fail("This should fail with an IllegalArgumentException");
        } catch (IllegalArgumentException e) {
        } catch (Exception e2) {
            Assert.fail("Wrong exception. Expected IllegalArgumentException but found " + e2.getClass().getSimpleName());
        }
    }

    private static StreamTask<?, ?> createMockTask() {
        StreamTask<?, ?> streamTask = (StreamTask) Mockito.mock(StreamTask.class);
        Mockito.when(streamTask.getAccumulatorMap()).thenReturn(new HashMap());
        Mockito.when(streamTask.getName()).thenReturn("Test task name");
        Mockito.when(streamTask.getExecutionConfig()).thenReturn(new ExecutionConfig());
        Environment environment = (Environment) Mockito.mock(Environment.class);
        Mockito.when(Integer.valueOf(environment.getIndexInSubtaskGroup())).thenReturn(0);
        Mockito.when(Integer.valueOf(environment.getNumberOfSubtasks())).thenReturn(1);
        Mockito.when(environment.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader());
        Mockito.when(streamTask.getEnvironment()).thenReturn(environment);
        Mockito.when(streamTask.getStateBackend()).thenReturn(MemoryStateBackend.defaultInstance());
        return streamTask;
    }

    private static StreamTask<?, ?> createMockTaskWithTimer(final ScheduledExecutorService scheduledExecutorService, final Object obj) {
        StreamTask<?, ?> createMockTask = createMockTask();
        ((StreamTask) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AccumulatingAlignedProcessingTimeWindowOperatorTest.3
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m25answer(InvocationOnMock invocationOnMock) throws Throwable {
                final Long l = (Long) invocationOnMock.getArguments()[0];
                final Triggerable triggerable = (Triggerable) invocationOnMock.getArguments()[1];
                scheduledExecutorService.schedule(new Callable<Object>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AccumulatingAlignedProcessingTimeWindowOperatorTest.3.1
                    @Override // java.util.concurrent.Callable
                    public Object call() throws Exception {
                        synchronized (obj) {
                            triggerable.trigger(l.longValue());
                        }
                        return null;
                    }
                }, l.longValue() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                return null;
            }
        }).when(createMockTask)).registerTimer(Mockito.anyLong(), (Triggerable) Mockito.any(Triggerable.class));
        return createMockTask;
    }

    private static StreamConfig createTaskConfig(KeySelector<?, ?> keySelector, TypeSerializer<?> typeSerializer) {
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        streamConfig.setStatePartitioner(keySelector);
        streamConfig.setStateKeySerializer(typeSerializer);
        return streamConfig;
    }
}
