package org.apache.flink.streaming.api.operators.windowing;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.WindowMapFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.api.windowing.helper.FullStream;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.class */
public class WindowIntegrationTest implements Serializable {
    private static final long serialVersionUID = 1;
    private static final Integer MEMORYSIZE = 32;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest$IdentityWindowMap.class */
    public static class IdentityWindowMap implements WindowMapFunction<Integer, StreamWindow<Integer>> {
        public void mapWindow(Iterable<Integer> iterable, Collector<StreamWindow<Integer>> collector) throws Exception {
            StreamWindow streamWindow = new StreamWindow();
            Iterator<Integer> it = iterable.iterator();
            while (it.hasNext()) {
                streamWindow.add(it.next());
            }
            collector.collect(streamWindow);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest$ModKey.class */
    public static class ModKey implements KeySelector<Integer, Integer> {
        private int m;

        public ModKey(int i) {
            this.m = i;
        }

        public Integer getKey(Integer num) throws Exception {
            return Integer.valueOf(num.intValue() % this.m);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest$TestSink1.class */
    private static class TestSink1 implements SinkFunction<StreamWindow<Integer>> {
        public static List<StreamWindow<Integer>> windows = Collections.synchronizedList(new ArrayList());

        private TestSink1() {
        }

        public void invoke(StreamWindow<Integer> streamWindow) throws Exception {
            windows.add(streamWindow);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest$TestSink10.class */
    private static class TestSink10 implements SinkFunction<StreamWindow<Integer>> {
        public static List<StreamWindow<Integer>> windows = Collections.synchronizedList(new ArrayList());

        private TestSink10() {
        }

        public void invoke(StreamWindow<Integer> streamWindow) throws Exception {
            windows.add(streamWindow);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest$TestSink11.class */
    private static class TestSink11 implements SinkFunction<StreamWindow<Integer>> {
        public static List<StreamWindow<Integer>> windows = Collections.synchronizedList(new ArrayList());

        private TestSink11() {
        }

        public void invoke(StreamWindow<Integer> streamWindow) throws Exception {
            windows.add(streamWindow);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest$TestSink12.class */
    private static class TestSink12 implements SinkFunction<StreamWindow<Integer>> {
        public static List<StreamWindow<Integer>> windows = Collections.synchronizedList(new ArrayList());

        private TestSink12() {
        }

        public void invoke(StreamWindow<Integer> streamWindow) throws Exception {
            windows.add(streamWindow);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest$TestSink13.class */
    private static class TestSink13 implements SinkFunction<StreamWindow<Integer>> {
        public static List<StreamWindow<Integer>> windows = Collections.synchronizedList(new ArrayList());

        private TestSink13() {
        }

        public void invoke(StreamWindow<Integer> streamWindow) throws Exception {
            windows.add(streamWindow);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest$TestSink2.class */
    private static class TestSink2 implements SinkFunction<StreamWindow<Integer>> {
        public static List<StreamWindow<Integer>> windows = Collections.synchronizedList(new ArrayList());

        private TestSink2() {
        }

        public void invoke(StreamWindow<Integer> streamWindow) throws Exception {
            windows.add(streamWindow);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest$TestSink3.class */
    private static class TestSink3 implements SinkFunction<StreamWindow<Integer>> {
        public static List<StreamWindow<Integer>> windows = Collections.synchronizedList(new ArrayList());

        private TestSink3() {
        }

        public void invoke(StreamWindow<Integer> streamWindow) throws Exception {
            windows.add(streamWindow);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest$TestSink4.class */
    private static class TestSink4 implements SinkFunction<StreamWindow<Integer>> {
        public static List<StreamWindow<Integer>> windows = Collections.synchronizedList(new ArrayList());

        private TestSink4() {
        }

        public void invoke(StreamWindow<Integer> streamWindow) throws Exception {
            windows.add(streamWindow);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest$TestSink5.class */
    private static class TestSink5 implements SinkFunction<StreamWindow<Integer>> {
        public static List<StreamWindow<Integer>> windows = Collections.synchronizedList(new ArrayList());

        private TestSink5() {
        }

        public void invoke(StreamWindow<Integer> streamWindow) throws Exception {
            windows.add(streamWindow);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest$TestSink6.class */
    private static class TestSink6 implements SinkFunction<StreamWindow<Integer>> {
        public static List<StreamWindow<Integer>> windows = Collections.synchronizedList(new ArrayList());

        private TestSink6() {
        }

        public void invoke(StreamWindow<Integer> streamWindow) throws Exception {
            windows.add(streamWindow);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest$TestSink7.class */
    private static class TestSink7 implements SinkFunction<StreamWindow<Integer>> {
        public static List<StreamWindow<Integer>> windows = Collections.synchronizedList(new ArrayList());

        private TestSink7() {
        }

        public void invoke(StreamWindow<Integer> streamWindow) throws Exception {
            windows.add(streamWindow);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest$TestSink8.class */
    private static class TestSink8 implements SinkFunction<StreamWindow<Integer>> {
        public static List<StreamWindow<Integer>> windows = Collections.synchronizedList(new ArrayList());

        private TestSink8() {
        }

        public void invoke(StreamWindow<Integer> streamWindow) throws Exception {
            windows.add(streamWindow);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest$TestSink9.class */
    private static class TestSink9 implements SinkFunction<StreamWindow<Integer>> {
        public static List<StreamWindow<Integer>> windows = Collections.synchronizedList(new ArrayList());

        private TestSink9() {
        }

        public void invoke(StreamWindow<Integer> streamWindow) throws Exception {
            windows.add(streamWindow);
        }
    }

    @Test
    public void test() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(1);
        arrayList.add(2);
        arrayList.add(2);
        arrayList.add(3);
        arrayList.add(4);
        arrayList.add(5);
        arrayList.add(10);
        arrayList.add(11);
        arrayList.add(11);
        ModKey modKey = new ModKey(2);
        Timestamp<Integer> timestamp = new Timestamp<Integer>() { // from class: org.apache.flink.streaming.api.operators.windowing.WindowIntegrationTest.1
            private static final long serialVersionUID = 1;

            public long getTimestamp(Integer num) {
                return num.intValue();
            }
        };
        TestStreamEnvironment testStreamEnvironment = new TestStreamEnvironment(2, MEMORYSIZE.intValue());
        testStreamEnvironment.disableOperatorChaining();
        DataStreamSource fromCollection = testStreamEnvironment.fromCollection(arrayList);
        fromCollection.window(Time.of(3L, timestamp, serialVersionUID)).every(Time.of(2L, timestamp, serialVersionUID)).sum(0).getDiscretizedStream().addSink(new TestSink1());
        fromCollection.window(Time.of(4L, timestamp, serialVersionUID)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap()).flatten().addSink(new TestSink2());
        fromCollection.groupBy(modKey).window(Time.of(4L, timestamp, serialVersionUID)).sum(0).getDiscretizedStream().addSink(new TestSink4());
        fromCollection.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap()).flatten().addSink(new TestSink5());
        fromCollection.window(Time.of(2L, timestamp)).every(Time.of(3L, timestamp)).min(0).getDiscretizedStream().addSink(new TestSink3());
        fromCollection.groupBy(modKey).window(Time.of(4L, timestamp, serialVersionUID)).max(0).getDiscretizedStream().addSink(new TestSink6());
        fromCollection.window(Time.of(5L, timestamp, serialVersionUID)).mapWindow(new IdentityWindowMap()).flatten().addSink(new TestSink7());
        fromCollection.window(Time.of(5L, timestamp, serialVersionUID)).every(Time.of(4L, timestamp, serialVersionUID)).groupBy(new ModKey(2)).sum(0).getDiscretizedStream().addSink(new TestSink8());
        try {
            fromCollection.window(FullStream.window()).every(Count.of(2)).getDiscretizedStream();
            Assert.fail();
        } catch (Exception e) {
        }
        try {
            fromCollection.window(FullStream.window()).getDiscretizedStream();
            Assert.fail();
        } catch (Exception e2) {
        }
        try {
            fromCollection.every(Count.of(5)).mapWindow(new IdentityWindowMap()).getDiscretizedStream();
            Assert.fail();
        } catch (Exception e3) {
        }
        fromCollection.every(Count.of(4)).sum(0).getDiscretizedStream().addSink(new TestSink11());
        fromCollection.window(FullStream.window()).every(Count.of(4)).groupBy(modKey).sum(0).getDiscretizedStream().addSink(new TestSink12());
        DataStreamSource addSource = testStreamEnvironment.addSource(new ParallelSourceFunction<Integer>() { // from class: org.apache.flink.streaming.api.operators.windowing.WindowIntegrationTest.2
            private static final long serialVersionUID = 1;

            public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
                for (int i = 1; i <= 10; i++) {
                    sourceContext.collect(Integer.valueOf(i));
                }
            }

            public void cancel() {
            }
        });
        DataStreamSource addSource2 = testStreamEnvironment.addSource(new RichParallelSourceFunction<Integer>() { // from class: org.apache.flink.streaming.api.operators.windowing.WindowIntegrationTest.3
            private static final long serialVersionUID = 1;
            private int i = 1;

            public void open(Configuration configuration) throws Exception {
                super.open(configuration);
                this.i = 1 + getRuntimeContext().getIndexOfThisSubtask();
            }

            public void cancel() {
            }

            public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
                while (this.i < 11) {
                    sourceContext.collect(Integer.valueOf(this.i));
                    this.i += 2;
                }
            }
        });
        addSource.window(Time.of(2L, timestamp, serialVersionUID)).sum(0).getDiscretizedStream().addSink(new TestSink9());
        addSource2.window(Time.of(5L, timestamp, serialVersionUID)).groupBy(new ModKey(2)).sum(0).getDiscretizedStream().addSink(new TestSink10());
        fromCollection.map(new MapFunction<Integer, Integer>() { // from class: org.apache.flink.streaming.api.operators.windowing.WindowIntegrationTest.4
            public Integer map(Integer num) throws Exception {
                return num;
            }
        }).every(Time.of(5L, timestamp, serialVersionUID)).sum(0).getDiscretizedStream().addSink(new TestSink13());
        testStreamEnvironment.execute();
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(StreamWindow.fromElements(new Integer[]{5}));
        arrayList2.add(StreamWindow.fromElements(new Integer[]{11}));
        arrayList2.add(StreamWindow.fromElements(new Integer[]{9}));
        arrayList2.add(StreamWindow.fromElements(new Integer[]{10}));
        arrayList2.add(StreamWindow.fromElements(new Integer[]{32}));
        validateOutput(arrayList2, TestSink1.windows);
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(StreamWindow.fromElements(new Integer[]{2, 2, 4}));
        arrayList3.add(StreamWindow.fromElements(new Integer[]{1, 3}));
        arrayList3.add(StreamWindow.fromElements(new Integer[]{5}));
        arrayList3.add(StreamWindow.fromElements(new Integer[]{10}));
        arrayList3.add(StreamWindow.fromElements(new Integer[]{11, 11}));
        validateOutput(arrayList3, TestSink2.windows);
        ArrayList arrayList4 = new ArrayList();
        arrayList4.add(StreamWindow.fromElements(new Integer[]{4}));
        arrayList4.add(StreamWindow.fromElements(new Integer[]{5}));
        arrayList4.add(StreamWindow.fromElements(new Integer[]{22}));
        arrayList4.add(StreamWindow.fromElements(new Integer[]{8}));
        arrayList4.add(StreamWindow.fromElements(new Integer[]{10}));
        validateOutput(arrayList4, TestSink4.windows);
        ArrayList arrayList5 = new ArrayList();
        arrayList5.add(StreamWindow.fromElements(new Integer[]{2, 2}));
        arrayList5.add(StreamWindow.fromElements(new Integer[]{1}));
        arrayList5.add(StreamWindow.fromElements(new Integer[]{4}));
        arrayList5.add(StreamWindow.fromElements(new Integer[]{5, 11}));
        arrayList5.add(StreamWindow.fromElements(new Integer[]{10}));
        arrayList5.add(StreamWindow.fromElements(new Integer[]{11}));
        arrayList5.add(StreamWindow.fromElements(new Integer[]{3}));
        validateOutput(arrayList5, TestSink5.windows);
        ArrayList arrayList6 = new ArrayList();
        arrayList6.add(StreamWindow.fromElements(new Integer[]{1}));
        arrayList6.add(StreamWindow.fromElements(new Integer[]{4}));
        arrayList6.add(StreamWindow.fromElements(new Integer[]{10}));
        validateOutput(arrayList6, TestSink3.windows);
        ArrayList arrayList7 = new ArrayList();
        arrayList7.add(StreamWindow.fromElements(new Integer[]{3}));
        arrayList7.add(StreamWindow.fromElements(new Integer[]{5}));
        arrayList7.add(StreamWindow.fromElements(new Integer[]{11}));
        arrayList7.add(StreamWindow.fromElements(new Integer[]{4}));
        arrayList7.add(StreamWindow.fromElements(new Integer[]{10}));
        validateOutput(arrayList7, TestSink6.windows);
        ArrayList arrayList8 = new ArrayList();
        arrayList8.add(StreamWindow.fromElements(new Integer[]{1, 2, 2, 3, 4, 5}));
        arrayList8.add(StreamWindow.fromElements(new Integer[]{10}));
        arrayList8.add(StreamWindow.fromElements(new Integer[]{10, 11, 11}));
        validateOutput(arrayList8, TestSink7.windows);
        ArrayList arrayList9 = new ArrayList();
        arrayList9.add(StreamWindow.fromElements(new Integer[]{4, 8}));
        arrayList9.add(StreamWindow.fromElements(new Integer[]{4, 5}));
        arrayList9.add(StreamWindow.fromElements(new Integer[]{10, 22}));
        Iterator<StreamWindow<Integer>> it = TestSink8.windows.iterator();
        while (it.hasNext()) {
            Collections.sort(it.next());
        }
        validateOutput(arrayList9, TestSink8.windows);
        ArrayList arrayList10 = new ArrayList();
        arrayList10.add(StreamWindow.fromElements(new Integer[]{6}));
        arrayList10.add(StreamWindow.fromElements(new Integer[]{14}));
        arrayList10.add(StreamWindow.fromElements(new Integer[]{22}));
        arrayList10.add(StreamWindow.fromElements(new Integer[]{30}));
        arrayList10.add(StreamWindow.fromElements(new Integer[]{38}));
        validateOutput(arrayList10, TestSink9.windows);
        ArrayList arrayList11 = new ArrayList();
        arrayList11.add(StreamWindow.fromElements(new Integer[]{6, 9}));
        arrayList11.add(StreamWindow.fromElements(new Integer[]{16, 24}));
        Iterator<StreamWindow<Integer>> it2 = TestSink10.windows.iterator();
        while (it2.hasNext()) {
            Collections.sort(it2.next());
        }
        validateOutput(arrayList11, TestSink10.windows);
        ArrayList arrayList12 = new ArrayList();
        arrayList12.add(StreamWindow.fromElements(new Integer[]{8}));
        arrayList12.add(StreamWindow.fromElements(new Integer[]{38}));
        arrayList12.add(StreamWindow.fromElements(new Integer[]{49}));
        Iterator<StreamWindow<Integer>> it3 = TestSink11.windows.iterator();
        while (it3.hasNext()) {
            Collections.sort(it3.next());
        }
        validateOutput(arrayList12, TestSink11.windows);
        ArrayList arrayList13 = new ArrayList();
        arrayList13.add(StreamWindow.fromElements(new Integer[]{4, 4}));
        arrayList13.add(StreamWindow.fromElements(new Integer[]{18, 20}));
        arrayList13.add(StreamWindow.fromElements(new Integer[]{18, 31}));
        Iterator<StreamWindow<Integer>> it4 = TestSink12.windows.iterator();
        while (it4.hasNext()) {
            Collections.sort(it4.next());
        }
        validateOutput(arrayList13, TestSink12.windows);
        ArrayList arrayList14 = new ArrayList();
        arrayList14.add(StreamWindow.fromElements(new Integer[]{17}));
        arrayList14.add(StreamWindow.fromElements(new Integer[]{27}));
        arrayList14.add(StreamWindow.fromElements(new Integer[]{49}));
        Iterator<StreamWindow<Integer>> it5 = TestSink13.windows.iterator();
        while (it5.hasNext()) {
            Collections.sort(it5.next());
        }
        validateOutput(arrayList14, TestSink13.windows);
    }

    public static <R> void validateOutput(List<R> list, List<R> list2) {
        Assert.assertEquals(new HashSet(list), new HashSet(list2));
    }
}
