package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.class */
public class AggregatingAlignedProcessingTimeWindowOperatorTest {
    private final ReduceFunction<String> mockFunction = (ReduceFunction) Mockito.mock(ReduceFunction.class);
    private final KeySelector<String, String> mockKeySelector = (KeySelector) Mockito.mock(KeySelector.class);
    private final KeySelector<Tuple2<Integer, Integer>, Integer> fieldOneSelector = new KeySelector<Tuple2<Integer, Integer>, Integer>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AggregatingAlignedProcessingTimeWindowOperatorTest.1
        public Integer getKey(Tuple2<Integer, Integer> tuple2) {
            return (Integer) tuple2.f0;
        }
    };
    private final ReduceFunction<Tuple2<Integer, Integer>> sumFunction = new ReduceFunction<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AggregatingAlignedProcessingTimeWindowOperatorTest.2
        public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> tuple2, Tuple2<Integer, Integer> tuple22) {
            return new Tuple2<>(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
        }
    };
    private final TypeSerializer<Tuple2<Integer, Integer>> tupleSerializer = new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}).createSerializer(new ExecutionConfig());
    private final Comparator<Tuple2<Integer, Integer>> tupleComparator = new Comparator<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AggregatingAlignedProcessingTimeWindowOperatorTest.3
        @Override // java.util.Comparator
        public int compare(Tuple2<Integer, Integer> tuple2, Tuple2<Integer, Integer> tuple22) {
            int intValue = ((Integer) tuple2.f0).intValue() - ((Integer) tuple22.f0).intValue();
            return intValue != 0 ? intValue : ((Integer) tuple2.f1).intValue() - ((Integer) tuple22.f1).intValue();
        }
    };

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest$FailingFunction.class */
    private static class FailingFunction implements ReduceFunction<Tuple2<Integer, Integer>> {
        private final int failAfterElements;
        private int numElements;

        FailingFunction(int i) {
            this.failAfterElements = i;
        }

        public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> tuple2, Tuple2<Integer, Integer> tuple22) throws Exception {
            this.numElements++;
            if (this.numElements >= this.failAfterElements) {
                throw new Exception("Artificial Test Exception");
            }
            return new Tuple2<>(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest$StatefulFunction.class */
    private static class StatefulFunction extends RichReduceFunction<Tuple2<Integer, Integer>> {
        static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap();
        private ValueState<Integer> state;

        private StatefulFunction() {
        }

        public void open(Configuration configuration) {
            Assert.assertNotNull(getRuntimeContext());
            this.state = getRuntimeContext().getState(new ValueStateDescriptor("totalCount", Integer.class, 1));
        }

        /* JADX WARN: Multi-variable type inference failed */
        public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> tuple2, Tuple2<Integer, Integer> tuple22) throws Exception {
            this.state.update(Integer.valueOf(((Integer) this.state.value()).intValue() + 1));
            globalCounts.put(tuple2.f0, this.state.value());
            return new Tuple2<>(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
        }
    }

    public AggregatingAlignedProcessingTimeWindowOperatorTest() {
        ClosureCleaner.clean(this.fieldOneSelector, false);
        ClosureCleaner.clean(this.sumFunction, false);
    }

    @After
    public void checkNoTriggerThreadsRunning() {
        long currentTimeMillis = System.currentTimeMillis() + 5000;
        while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < currentTimeMillis) {
            try {
                Thread.sleep(10L);
            } catch (InterruptedException e) {
            }
        }
        Assert.assertTrue("Not all trigger threads where properly shut down", StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0);
    }

    @Test
    public void testInvalidParameters() {
        try {
            assertInvalidParameter(-1L, -1L);
            assertInvalidParameter(10000L, -1L);
            assertInvalidParameter(-1L, 1000L);
            assertInvalidParameter(1000L, 2000L);
            assertInvalidParameter(1000L, 999L);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testWindowSizeAndSlide() {
        try {
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000L, 1000L);
            Assert.assertEquals(5000L, aggregatingProcessingTimeWindowOperator.getWindowSize());
            Assert.assertEquals(1000L, aggregatingProcessingTimeWindowOperator.getWindowSlide());
            Assert.assertEquals(1000L, aggregatingProcessingTimeWindowOperator.getPaneSize());
            Assert.assertEquals(5L, aggregatingProcessingTimeWindowOperator.getNumPanesPerWindow());
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator2 = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000L, 1000L);
            Assert.assertEquals(1000L, aggregatingProcessingTimeWindowOperator2.getWindowSize());
            Assert.assertEquals(1000L, aggregatingProcessingTimeWindowOperator2.getWindowSlide());
            Assert.assertEquals(1000L, aggregatingProcessingTimeWindowOperator2.getPaneSize());
            Assert.assertEquals(1L, aggregatingProcessingTimeWindowOperator2.getNumPanesPerWindow());
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator3 = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500L, 1000L);
            Assert.assertEquals(1500L, aggregatingProcessingTimeWindowOperator3.getWindowSize());
            Assert.assertEquals(1000L, aggregatingProcessingTimeWindowOperator3.getWindowSlide());
            Assert.assertEquals(500L, aggregatingProcessingTimeWindowOperator3.getPaneSize());
            Assert.assertEquals(3L, aggregatingProcessingTimeWindowOperator3.getNumPanesPerWindow());
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator4 = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200L, 1100L);
            Assert.assertEquals(1200L, aggregatingProcessingTimeWindowOperator4.getWindowSize());
            Assert.assertEquals(1100L, aggregatingProcessingTimeWindowOperator4.getWindowSlide());
            Assert.assertEquals(100L, aggregatingProcessingTimeWindowOperator4.getPaneSize());
            Assert.assertEquals(12L, aggregatingProcessingTimeWindowOperator4.getNumPanesPerWindow());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testWindowTriggerTimeAlignment() throws Exception {
        try {
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000L, 1000L);
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(aggregatingProcessingTimeWindowOperator, this.mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness.open();
            Assert.assertTrue(aggregatingProcessingTimeWindowOperator.getNextSlideTime() % 1000 == 0);
            Assert.assertTrue(aggregatingProcessingTimeWindowOperator.getNextEvaluationTime() % 1000 == 0);
            keyedOneInputStreamOperatorTestHarness.close();
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator2 = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000L, 1000L);
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness2 = new KeyedOneInputStreamOperatorTestHarness(aggregatingProcessingTimeWindowOperator2, this.mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness2.open();
            Assert.assertTrue(aggregatingProcessingTimeWindowOperator2.getNextSlideTime() % 1000 == 0);
            Assert.assertTrue(aggregatingProcessingTimeWindowOperator2.getNextEvaluationTime() % 1000 == 0);
            keyedOneInputStreamOperatorTestHarness2.close();
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator3 = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500L, 1000L);
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness3 = new KeyedOneInputStreamOperatorTestHarness(aggregatingProcessingTimeWindowOperator3, this.mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness3.open();
            Assert.assertTrue(aggregatingProcessingTimeWindowOperator3.getNextSlideTime() % 500 == 0);
            Assert.assertTrue(aggregatingProcessingTimeWindowOperator3.getNextEvaluationTime() % 1000 == 0);
            keyedOneInputStreamOperatorTestHarness3.close();
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator4 = new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200L, 1100L);
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness4 = new KeyedOneInputStreamOperatorTestHarness(aggregatingProcessingTimeWindowOperator4, this.mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness4.open();
            Assert.assertTrue(aggregatingProcessingTimeWindowOperator4.getNextSlideTime() % 100 == 0);
            Assert.assertTrue(aggregatingProcessingTimeWindowOperator4.getNextEvaluationTime() % 1100 == 0);
            keyedOneInputStreamOperatorTestHarness4.close();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTumblingWindowUniqueElements() throws Exception {
        try {
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 50L, 50L), this.fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness.open();
            long j = 0;
            for (int i = 0; i < 1000; i++) {
                keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(Integer.valueOf(i), Integer.valueOf(i))));
                j += 10;
                keyedOneInputStreamOperatorTestHarness.setProcessingTime(j);
            }
            List extractFromStreamRecords = extractFromStreamRecords(keyedOneInputStreamOperatorTestHarness.extractOutputStreamRecords());
            Assert.assertEquals(1000L, extractFromStreamRecords.size());
            keyedOneInputStreamOperatorTestHarness.close();
            Collections.sort(extractFromStreamRecords, this.tupleComparator);
            for (int i2 = 0; i2 < 1000; i2++) {
                Assert.assertEquals(i2, ((Integer) ((Tuple2) extractFromStreamRecords.get(i2)).f0).intValue());
                Assert.assertEquals(i2, ((Integer) ((Tuple2) extractFromStreamRecords.get(i2)).f1).intValue());
            }
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTumblingWindowDuplicateElements() throws Exception {
        try {
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 50L, 50L);
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(aggregatingProcessingTimeWindowOperator, this.fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(0L);
            keyedOneInputStreamOperatorTestHarness.open();
            long j = 0;
            int i = 1;
            long j2 = 0;
            while (i <= 10) {
                long nextEvaluationTime = aggregatingProcessingTimeWindowOperator.getNextEvaluationTime();
                int i2 = ((int) nextEvaluationTime) ^ ((int) (nextEvaluationTime >>> 32));
                keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(Integer.valueOf(i2), Integer.valueOf(i2))));
                if (nextEvaluationTime != j) {
                    i++;
                    j = nextEvaluationTime;
                }
                j2++;
                keyedOneInputStreamOperatorTestHarness.setProcessingTime(j2);
            }
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(j2 + 100);
            List extractFromStreamRecords = extractFromStreamRecords(keyedOneInputStreamOperatorTestHarness.extractOutputStreamRecords());
            keyedOneInputStreamOperatorTestHarness.close();
            Assert.assertTrue(extractFromStreamRecords.size() >= 10 && extractFromStreamRecords.size() <= 20);
            Assert.assertTrue(new HashSet(extractFromStreamRecords).size() == 10);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSlidingWindow() throws Exception {
        try {
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 150L, 50L), this.fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness.open();
            long j = 0;
            for (int i = 0; i < 1000; i++) {
                keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(Integer.valueOf(i), Integer.valueOf(i))));
                j++;
                keyedOneInputStreamOperatorTestHarness.setProcessingTime(j);
            }
            List<Tuple2> extractFromStreamRecords = extractFromStreamRecords(keyedOneInputStreamOperatorTestHarness.extractOutputStreamRecords());
            keyedOneInputStreamOperatorTestHarness.close();
            if (extractFromStreamRecords.size() < 1000 || extractFromStreamRecords.size() > 3000) {
                System.out.println(extractFromStreamRecords);
                Assert.fail("Wrong number of results: " + extractFromStreamRecords.size());
            }
            Collections.sort(extractFromStreamRecords, this.tupleComparator);
            int i2 = -1;
            int i3 = -1;
            for (Tuple2 tuple2 : extractFromStreamRecords) {
                Assert.assertEquals(tuple2.f0, tuple2.f1);
                if (((Integer) tuple2.f0).intValue() == i2) {
                    i3++;
                    Assert.assertTrue(i3 <= 3);
                } else {
                    i2 = ((Integer) tuple2.f0).intValue();
                    i3 = 1;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSlidingWindowSingleElements() throws Exception {
        try {
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 150L, 50L), this.fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness.open();
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(0L);
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(1, 1)));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(2, 2)));
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(50L);
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(100L);
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(150L);
            List extractFromStreamRecords = extractFromStreamRecords(keyedOneInputStreamOperatorTestHarness.extractOutputStreamRecords());
            Assert.assertEquals(6L, extractFromStreamRecords.size());
            Collections.sort(extractFromStreamRecords, this.tupleComparator);
            Assert.assertEquals(Arrays.asList(new Tuple2(1, 1), new Tuple2(1, 1), new Tuple2(1, 1), new Tuple2(2, 2), new Tuple2(2, 2), new Tuple2(2, 2)), extractFromStreamRecords);
            keyedOneInputStreamOperatorTestHarness.close();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testPropagateExceptionsFromProcessElement() throws Exception {
        try {
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(new FailingFunction(100), this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 3153600000000L, 3153600000000L);
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(aggregatingProcessingTimeWindowOperator, this.fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness.open();
            for (int i = 0; i < 100; i++) {
                keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(1, 1)));
            }
            try {
                keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(1, 1)));
                Assert.fail("This fail with an exception");
            } catch (Exception e) {
                Assert.assertTrue(e.getMessage().contains("Artificial Test Exception"));
            }
            aggregatingProcessingTimeWindowOperator.dispose();
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void checkpointRestoreWithPendingWindowTumbling() {
        try {
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 200L, 200L);
            OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(aggregatingProcessingTimeWindowOperator);
            oneInputStreamOperatorTestHarness.setProcessingTime(0L);
            oneInputStreamOperatorTestHarness.setup();
            oneInputStreamOperatorTestHarness.open();
            for (int i = 0; i < 700; i++) {
                oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(Integer.valueOf(i), Integer.valueOf(i))));
            }
            List extractFromStreamRecords = extractFromStreamRecords(oneInputStreamOperatorTestHarness.getOutput());
            int size = extractFromStreamRecords.size();
            StreamStateHandle snapshotLegacy = oneInputStreamOperatorTestHarness.snapshotLegacy(1L, System.currentTimeMillis());
            Assert.assertEquals("operator performed computation during snapshot", size, oneInputStreamOperatorTestHarness.getOutput().size());
            Assert.assertTrue(extractFromStreamRecords.size() <= 700);
            for (int i2 = 700; i2 < 1000; i2++) {
                oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(Integer.valueOf(i2), Integer.valueOf(i2))));
            }
            oneInputStreamOperatorTestHarness.close();
            aggregatingProcessingTimeWindowOperator.dispose();
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator2 = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 200L, 200L);
            OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = new OneInputStreamOperatorTestHarness(aggregatingProcessingTimeWindowOperator2);
            oneInputStreamOperatorTestHarness2.setup();
            oneInputStreamOperatorTestHarness2.restore(snapshotLegacy);
            oneInputStreamOperatorTestHarness2.open();
            for (int i3 = 700; i3 < 1000; i3++) {
                oneInputStreamOperatorTestHarness2.processElement(new StreamRecord(new Tuple2(Integer.valueOf(i3), Integer.valueOf(i3))));
            }
            oneInputStreamOperatorTestHarness2.setProcessingTime(200L);
            ArrayList arrayList = new ArrayList(extractFromStreamRecords);
            arrayList.addAll(extractFromStreamRecords(oneInputStreamOperatorTestHarness2.getOutput()));
            Assert.assertEquals(1000L, arrayList.size());
            Collections.sort(arrayList, this.tupleComparator);
            for (int i4 = 0; i4 < 1000; i4++) {
                Assert.assertEquals(i4, ((Integer) ((Tuple2) arrayList.get(i4)).f0).intValue());
                Assert.assertEquals(i4, ((Integer) ((Tuple2) arrayList.get(i4)).f1).intValue());
            }
            oneInputStreamOperatorTestHarness2.close();
            aggregatingProcessingTimeWindowOperator2.dispose();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void checkpointRestoreWithPendingWindowSliding() {
        try {
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 200L, 50L);
            OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(aggregatingProcessingTimeWindowOperator);
            oneInputStreamOperatorTestHarness.setProcessingTime(0L);
            oneInputStreamOperatorTestHarness.setup();
            oneInputStreamOperatorTestHarness.open();
            for (int i = 0; i < 700; i++) {
                oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(Integer.valueOf(i), Integer.valueOf(i))));
            }
            List extractFromStreamRecords = extractFromStreamRecords(oneInputStreamOperatorTestHarness.getOutput());
            int size = extractFromStreamRecords.size();
            StreamStateHandle snapshotLegacy = oneInputStreamOperatorTestHarness.snapshotLegacy(1L, System.currentTimeMillis());
            Assert.assertEquals("operator performed computation during snapshot", size, oneInputStreamOperatorTestHarness.getOutput().size());
            Assert.assertTrue(extractFromStreamRecords.size() <= 2800);
            for (int i2 = 700; i2 < 1000; i2++) {
                oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(Integer.valueOf(i2), Integer.valueOf(i2))));
            }
            oneInputStreamOperatorTestHarness.close();
            aggregatingProcessingTimeWindowOperator.dispose();
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator2 = new AggregatingProcessingTimeWindowOperator(this.sumFunction, this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 200L, 50L);
            OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = new OneInputStreamOperatorTestHarness(aggregatingProcessingTimeWindowOperator2);
            oneInputStreamOperatorTestHarness2.setup();
            oneInputStreamOperatorTestHarness2.restore(snapshotLegacy);
            oneInputStreamOperatorTestHarness2.open();
            for (int i3 = 700; i3 < 1000; i3++) {
                oneInputStreamOperatorTestHarness2.processElement(new StreamRecord(new Tuple2(Integer.valueOf(i3), Integer.valueOf(i3))));
            }
            oneInputStreamOperatorTestHarness2.setProcessingTime(50L);
            oneInputStreamOperatorTestHarness2.setProcessingTime(100L);
            oneInputStreamOperatorTestHarness2.setProcessingTime(150L);
            oneInputStreamOperatorTestHarness2.setProcessingTime(200L);
            oneInputStreamOperatorTestHarness2.setProcessingTime(250L);
            oneInputStreamOperatorTestHarness2.setProcessingTime(300L);
            oneInputStreamOperatorTestHarness2.setProcessingTime(350L);
            oneInputStreamOperatorTestHarness2.setProcessingTime(400L);
            ArrayList arrayList = new ArrayList(extractFromStreamRecords);
            arrayList.addAll(extractFromStreamRecords(oneInputStreamOperatorTestHarness2.getOutput()));
            Assert.assertEquals(4000L, arrayList.size());
            Collections.sort(arrayList, this.tupleComparator);
            for (int i4 = 0; i4 < 4000; i4++) {
                Assert.assertEquals(i4 / 4, ((Integer) ((Tuple2) arrayList.get(i4)).f0).intValue());
                Assert.assertEquals(i4 / 4, ((Integer) ((Tuple2) arrayList.get(i4)).f1).intValue());
            }
            oneInputStreamOperatorTestHarness2.close();
            aggregatingProcessingTimeWindowOperator2.dispose();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testKeyValueStateInWindowFunctionTumbling() {
        try {
            StatefulFunction.globalCounts.clear();
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(new StatefulFunction(), this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 2000L, 2000L);
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(aggregatingProcessingTimeWindowOperator, this.fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(0L);
            keyedOneInputStreamOperatorTestHarness.open();
            for (int i = 0; i < 10; i++) {
                keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(1, Integer.valueOf(i))));
                keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Tuple2(2, Integer.valueOf(i))));
            }
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(1000L);
            int intValue = StatefulFunction.globalCounts.get(1).intValue();
            int intValue2 = StatefulFunction.globalCounts.get(2).intValue();
            Assert.assertTrue(intValue >= 2 && intValue <= 20);
            Assert.assertEquals(intValue, intValue2);
            keyedOneInputStreamOperatorTestHarness.close();
            aggregatingProcessingTimeWindowOperator.dispose();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testKeyValueStateInWindowFunctionSliding() {
        try {
            StatefulFunction.globalCounts.clear();
            AggregatingProcessingTimeWindowOperator aggregatingProcessingTimeWindowOperator = new AggregatingProcessingTimeWindowOperator(new StatefulFunction(), this.fieldOneSelector, IntSerializer.INSTANCE, this.tupleSerializer, 100L, 50L);
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(aggregatingProcessingTimeWindowOperator, this.fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(0L);
            keyedOneInputStreamOperatorTestHarness.open();
            for (int i = 0; i < 100; i++) {
                StreamRecord streamRecord = new StreamRecord(new Tuple2(1, Integer.valueOf(i)));
                StreamRecord streamRecord2 = new StreamRecord(new Tuple2(2, Integer.valueOf(i)));
                StreamRecord streamRecord3 = new StreamRecord(new Tuple2(1, Integer.valueOf(i)));
                StreamRecord streamRecord4 = new StreamRecord(new Tuple2(2, Integer.valueOf(i)));
                keyedOneInputStreamOperatorTestHarness.processElement(streamRecord);
                keyedOneInputStreamOperatorTestHarness.processElement(streamRecord2);
                keyedOneInputStreamOperatorTestHarness.processElement(streamRecord3);
                keyedOneInputStreamOperatorTestHarness.processElement(streamRecord4);
            }
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(50L);
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(100L);
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(150L);
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(200L);
            int intValue = StatefulFunction.globalCounts.get(1).intValue();
            int intValue2 = StatefulFunction.globalCounts.get(2).intValue();
            Assert.assertTrue(intValue >= 2 && intValue <= 200);
            Assert.assertEquals(intValue, intValue2);
            keyedOneInputStreamOperatorTestHarness.close();
            aggregatingProcessingTimeWindowOperator.dispose();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    private void assertInvalidParameter(long j, long j2) {
        try {
            new AggregatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, j, j2);
            Assert.fail("This should fail with an IllegalArgumentException");
        } catch (IllegalArgumentException e) {
        } catch (Exception e2) {
            Assert.fail("Wrong exception. Expected IllegalArgumentException but found " + e2.getClass().getSimpleName());
        }
    }

    private <T> List<T> extractFromStreamRecords(Iterable<?> iterable) {
        ArrayList arrayList = new ArrayList();
        for (Object obj : iterable) {
            if (obj instanceof StreamRecord) {
                arrayList.add(((StreamRecord) obj).getValue());
            }
        }
        return arrayList;
    }
}
