package org.apache.flink.streaming.api.complex;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.WindowMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.api.windowing.helper.Delta;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest.class */
public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
    private String resultPath1;
    private String resultPath2;
    private String expected1;
    private String expected2;

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$DivisorsFlatMapFunction.class */
    private static class DivisorsFlatMapFunction implements FlatMapFunction<Long, Long> {
        private static final long serialVersionUID = 1;

        private DivisorsFlatMapFunction() {
        }

        public void flatMap(Long l, Collector<Long> collector) throws Exception {
            long j = 2;
            while (true) {
                long j2 = j;
                if (j2 > l.longValue()) {
                    return;
                }
                if (l.longValue() % j2 == 0) {
                    collector.collect(Long.valueOf(j2));
                }
                j = j2 + serialVersionUID;
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Long) obj, (Collector<Long>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$FlatMapFunction6.class */
    private static class FlatMapFunction6 implements FlatMapFunction<Tuple2<Date, HashMap<Character, Integer>>, Tuple2<Integer, Tuple2<Character, Integer>>> {
        private FlatMapFunction6() {
        }

        public void flatMap(Tuple2<Date, HashMap<Character, Integer>> tuple2, Collector<Tuple2<Integer, Tuple2<Character, Integer>>> collector) throws Exception {
            Calendar calendar = Calendar.getInstance();
            calendar.setTime((Date) tuple2.f0);
            for (Character ch : ((HashMap) tuple2.f1).keySet()) {
                collector.collect(new Tuple2(Integer.valueOf(calendar.get(2) + 1), new Tuple2(ch, ((HashMap) tuple2.f1).get(ch))));
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Tuple2<Date, HashMap<Character, Integer>>) obj, (Collector<Tuple2<Integer, Tuple2<Character, Integer>>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$IncrementMap.class */
    private class IncrementMap implements MapFunction<Tuple2<Long, Tuple2<String, Long>>, Tuple2<Long, Tuple2<String, Long>>> {
        private static final long serialVersionUID = 1;

        private IncrementMap() {
        }

        public Tuple2<Long, Tuple2<String, Long>> map(Tuple2<Long, Tuple2<String, Long>> tuple2) throws Exception {
            return new Tuple2<>(Long.valueOf(((Long) tuple2.f0).longValue() + serialVersionUID), tuple2.f1);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$InnerPojo.class */
    public static class InnerPojo {
        public Long f0;
        public String f1;

        InnerPojo() {
        }

        public InnerPojo(Long l, String str) {
            this.f0 = l;
            this.f1 = str;
        }

        public String toString() {
            return "POJO(" + this.f0 + "," + this.f1 + ")";
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$MapFunction6.class */
    private static class MapFunction6 implements MapFunction<Tuple2<Date, HashMap<Character, Integer>>, ArrayList<Character>> {
        private MapFunction6() {
        }

        public ArrayList<Character> map(Tuple2<Date, HashMap<Character, Integer>> tuple2) throws Exception {
            ArrayList<Character> arrayList = new ArrayList<>();
            for (Character ch : ((HashMap) tuple2.f1).keySet()) {
                for (int i = 0; i < ((Integer) ((HashMap) tuple2.f1).get(ch)).intValue(); i++) {
                    arrayList.add(ch);
                }
            }
            Collections.sort(arrayList);
            return arrayList;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$MyCoMapFunction.class */
    private class MyCoMapFunction implements CoMapFunction<OuterPojo, OuterPojo, String> {
        private static final long serialVersionUID = 1;

        private MyCoMapFunction() {
        }

        public String map1(OuterPojo outerPojo) {
            return outerPojo.f0.f1;
        }

        public String map2(OuterPojo outerPojo) {
            return outerPojo.f0.f1;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$MyDelta.class */
    private static class MyDelta implements DeltaFunction<Tuple2<Rectangle, Integer>> {
        private static final long serialVersionUID = 1;

        private MyDelta() {
        }

        public double getDelta(Tuple2<Rectangle, Integer> tuple2, Tuple2<Rectangle, Integer> tuple22) {
            return (((Rectangle) tuple22.f0).b - ((Rectangle) tuple22.f0).a) - (((Rectangle) tuple2.f0).b - ((Rectangle) tuple2.f0).a);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$MyFlatMapFunction.class */
    private static class MyFlatMapFunction implements FlatMapFunction<Tuple4<Integer, String, Double, Boolean>, OuterPojo> {
        private static final long serialVersionUID = 1;

        private MyFlatMapFunction() {
        }

        public void flatMap(Tuple4<Integer, String, Double, Boolean> tuple4, Collector<OuterPojo> collector) throws Exception {
            if (((Boolean) tuple4.f3).booleanValue()) {
                for (int i = 0; i < 2; i++) {
                    collector.collect(new OuterPojo(new InnerPojo(Long.valueOf(((Integer) tuple4.f0).intValue()), (String) tuple4.f1), Long.valueOf(i)));
                }
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Tuple4<Integer, String, Double, Boolean>) obj, (Collector<OuterPojo>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$MyMapFunction2.class */
    private static class MyMapFunction2 implements MapFunction<Tuple5<Integer, String, Character, Double, Boolean>, Tuple4<Integer, String, Double, Boolean>> {
        private MyMapFunction2() {
        }

        public Tuple4<Integer, String, Double, Boolean> map(Tuple5<Integer, String, Character, Double, Boolean> tuple5) throws Exception {
            return new Tuple4<>(tuple5.f0, ((String) tuple5.f1) + "-" + tuple5.f2, tuple5.f3, tuple5.f4);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$MyOutputSelector.class */
    private class MyOutputSelector implements OutputSelector<Tuple2<Long, Tuple2<String, Long>>> {
        private static final long serialVersionUID = 1;

        private MyOutputSelector() {
        }

        public Iterable<String> select(Tuple2<Long, Tuple2<String, Long>> tuple2) {
            ArrayList arrayList = new ArrayList();
            if (((Long) tuple2.f0).longValue() == 10) {
                arrayList.add("iterate");
                arrayList.add("firstOutput");
            } else if (((Long) tuple2.f0).longValue() == 20) {
                arrayList.add("secondOutput");
            } else {
                arrayList.add("iterate");
            }
            return arrayList;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$MyTimestamp.class */
    private static class MyTimestamp implements Timestamp<Tuple5<Integer, String, Character, Double, Boolean>> {
        private static final long serialVersionUID = 1;

        private MyTimestamp() {
        }

        public long getTimestamp(Tuple5<Integer, String, Character, Double, Boolean> tuple5) {
            return ((Integer) tuple5.f0).intValue();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$MyWindowMapFunction.class */
    private static class MyWindowMapFunction implements WindowMapFunction<Tuple2<Rectangle, Integer>, Tuple2<Rectangle, Integer>> {
        private static final long serialVersionUID = 1;

        private MyWindowMapFunction() {
        }

        public void mapWindow(Iterable<Tuple2<Rectangle, Integer>> iterable, Collector<Tuple2<Rectangle, Integer>> collector) throws Exception {
            collector.collect(iterable.iterator().next());
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$OuterPojo.class */
    public static class OuterPojo {
        public InnerPojo f0;
        public Long f1;

        public OuterPojo(InnerPojo innerPojo, Long l) {
            this.f0 = innerPojo;
            this.f1 = l;
        }

        public String toString() {
            return "POJO(" + this.f0 + "," + this.f1 + ")";
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$PojoSource.class */
    private static class PojoSource implements SourceFunction<OuterPojo> {
        private static final long serialVersionUID = 1;
        long cnt;

        private PojoSource() {
            this.cnt = 0L;
        }

        public void run(SourceFunction.SourceContext<OuterPojo> sourceContext) throws Exception {
            for (int i = 0; i < 20; i++) {
                sourceContext.collect(new OuterPojo(new InnerPojo(Long.valueOf(this.cnt / 2), "water_melon-b"), 2L));
            }
        }

        public void cancel() {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$PrimeFilterFunction.class */
    private static class PrimeFilterFunction implements FilterFunction<Long> {
        private static final long serialVersionUID = 1;

        private PrimeFilterFunction() {
        }

        public boolean filter(Long l) throws Exception {
            if (l.longValue() < 2) {
                return false;
            }
            long j = 2;
            while (true) {
                long j2 = j;
                if (j2 >= l.longValue()) {
                    return true;
                }
                if (l.longValue() % j2 == 0) {
                    return false;
                }
                j = j2 + serialVersionUID;
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$Rectangle.class */
    public static class Rectangle {
        public int a;
        public int b;

        public Rectangle() {
        }

        public Rectangle(int i, int i2) {
            this.a = i;
            this.b = i2;
        }

        public Rectangle next() {
            return new Rectangle(this.a + (this.b % 11), this.b + (this.a % 9));
        }

        public String toString() {
            return "(" + this.a + "," + this.b + ")";
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$RectangleMapFunction.class */
    private static class RectangleMapFunction implements MapFunction<Rectangle, Tuple2<Rectangle, Integer>> {
        private static final long serialVersionUID = 1;
        private int counter;

        private RectangleMapFunction() {
            this.counter = 0;
        }

        public Tuple2<Rectangle, Integer> map(Rectangle rectangle) throws Exception {
            int i = this.counter;
            this.counter = i + 1;
            return new Tuple2<>(rectangle, Integer.valueOf(i));
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$RectangleSource.class */
    private static class RectangleSource extends RichSourceFunction<Rectangle> {
        private static final long serialVersionUID = 1;
        private transient Rectangle rectangle;

        private RectangleSource() {
        }

        public void open(Configuration configuration) throws Exception {
            this.rectangle = new Rectangle(100, 100);
        }

        public void run(SourceFunction.SourceContext<Rectangle> sourceContext) throws Exception {
            for (int i = 0; i < 100; i++) {
                sourceContext.collect(this.rectangle);
                this.rectangle = this.rectangle.next();
            }
        }

        public void cancel() {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$SalesReduceFunction.class */
    private static class SalesReduceFunction implements ReduceFunction<Tuple2<Date, HashMap<Character, Integer>>> {
        private static final long serialVersionUID = 1;

        private SalesReduceFunction() {
        }

        public Tuple2<Date, HashMap<Character, Integer>> reduce(Tuple2<Date, HashMap<Character, Integer>> tuple2, Tuple2<Date, HashMap<Character, Integer>> tuple22) throws Exception {
            HashMap hashMap = (HashMap) tuple2.f1;
            HashMap hashMap2 = (HashMap) tuple22.f1;
            for (Character ch : hashMap2.keySet()) {
                Integer num = (Integer) hashMap.get(ch);
                Integer num2 = (Integer) hashMap2.get(ch);
                if (num == null) {
                    num = 0;
                }
                hashMap.put(ch, Integer.valueOf(num.intValue() + num2.intValue()));
            }
            return new Tuple2<>(tuple22.f0, hashMap);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$SquareFlatMapFunction.class */
    private static class SquareFlatMapFunction implements FlatMapFunction<Long, Long> {
        private static final long serialVersionUID = 1;

        private SquareFlatMapFunction() {
        }

        public void flatMap(Long l, Collector<Long> collector) throws Exception {
            long j = 0;
            while (true) {
                long j2 = j;
                if (j2 >= l.longValue()) {
                    return;
                }
                collector.collect(l);
                j = j2 + serialVersionUID;
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Long) obj, (Collector<Long>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$Timestamp6.class */
    private static class Timestamp6 implements Timestamp<Tuple2<Date, HashMap<Character, Integer>>> {
        private Timestamp6() {
        }

        public long getTimestamp(Tuple2<Date, HashMap<Character, Integer>> tuple2) {
            Calendar.getInstance().setTime((Date) tuple2.f0);
            return (12 * r0.get(1)) + r0.get(2);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/complex/ComplexIntegrationTest$TupleSource.class */
    private static class TupleSource implements SourceFunction<Tuple2<Long, Tuple2<String, Long>>> {
        private static final long serialVersionUID = 1;

        private TupleSource() {
        }

        public void run(SourceFunction.SourceContext<Tuple2<Long, Tuple2<String, Long>>> sourceContext) throws Exception {
            for (int i = 0; i < 20; i++) {
                sourceContext.collect(new Tuple2(Long.valueOf(serialVersionUID), new Tuple2("a", Long.valueOf(serialVersionUID))));
            }
        }

        public void cancel() {
        }
    }

    @Before
    public void before() throws Exception {
        this.resultPath1 = this.tempFolder.newFile().toURI().toString();
        this.resultPath2 = this.tempFolder.newFile().toURI().toString();
        this.expected1 = "";
        this.expected2 = "";
    }

    @After
    public void after() throws Exception {
        compareResultsByLinesInMemory(this.expected1, this.resultPath1);
        compareResultsByLinesInMemory(this.expected2, this.resultPath2);
    }

    @Test
    public void complexIntegrationTest1() throws Exception {
        this.expected1 = "";
        for (int i = 0; i < 8; i++) {
            this.expected1 += "(10,(a,1))\n";
        }
        this.expected1 += "(10,(a,1))";
        this.expected2 = "";
        for (int i2 = 0; i2 < 18; i2++) {
            this.expected2 += "(20,(a,1))\n";
        }
        this.expected2 += "(20,(a,1))";
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        IterativeDataStream iterate = executionEnvironment.addSource(new TupleSource()).setParallelism(1).sum(0).setParallelism(1).filter(new FilterFunction<Tuple2<Long, Tuple2<String, Long>>>() { // from class: org.apache.flink.streaming.api.complex.ComplexIntegrationTest.1
            public boolean filter(Tuple2<Long, Tuple2<String, Long>> tuple2) throws Exception {
                return ((Long) tuple2.f0).longValue() < 20;
            }
        }).iterate(5000L);
        SplitDataStream split = iterate.map(new IncrementMap()).split(new MyOutputSelector());
        iterate.closeWith(split.select(new String[]{"iterate"}));
        split.select(new String[]{"firstOutput"}).writeAsText(this.resultPath1, FileSystem.WriteMode.OVERWRITE);
        split.select(new String[]{"secondOutput"}).writeAsText(this.resultPath2, FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
    }

    @Test
    @Ignore
    public void complexIntegrationTest2() throws Exception {
        this.expected1 = "water_melon-b\nwater_melon-b\nwater_melon-b\nwater_melon-b\nwater_melon-b\nwater_melon-b\nwater_melon-b\nwater_melon-b\nwater_melon-b\nwater_melon-b\nwater_melon-b\nwater_melon-b\nwater_melon-b\nwater_melon-b\nwater_melon-b\nwater_melon-b\nwater_melon-b\nwater_melon-b\nwater_melon-b\nwater_melon-b\norange-b\norange-b\norange-b\norange-b\norange-b\norange-b\norange-c\norange-c\norange-c\norange-c\norange-d\norange-d\npeach-d\npeach-d\n";
        List asList = Arrays.asList(new Tuple5(1, "apple", 'j', Double.valueOf(0.1d), false), new Tuple5(1, "peach", 'b', Double.valueOf(0.8d), false), new Tuple5(1, "orange", 'c', Double.valueOf(0.7d), true), new Tuple5(2, "apple", 'd', Double.valueOf(0.5d), false), new Tuple5(2, "peach", 'j', Double.valueOf(0.6d), false), new Tuple5(3, "orange", 'b', Double.valueOf(0.2d), true), new Tuple5(6, "apple", 'c', Double.valueOf(0.1d), false), new Tuple5(7, "peach", 'd', Double.valueOf(0.4d), false), new Tuple5(8, "orange", 'j', Double.valueOf(0.2d), true), new Tuple5(10, "apple", 'b', Double.valueOf(0.1d), false), new Tuple5(10, "peach", 'c', Double.valueOf(0.5d), false), new Tuple5(11, "orange", 'd', Double.valueOf(0.3d), true), new Tuple5(11, "apple", 'j', Double.valueOf(0.3d), false), new Tuple5(12, "peach", 'b', Double.valueOf(0.9d), false), new Tuple5(13, "orange", 'c', Double.valueOf(0.7d), true), new Tuple5(15, "apple", 'd', Double.valueOf(0.2d), false), new Tuple5(16, "peach", 'j', Double.valueOf(0.8d), false), new Tuple5(16, "orange", 'b', Double.valueOf(0.8d), true), new Tuple5(16, "apple", 'c', Double.valueOf(0.1d), false), new Tuple5(17, "peach", 'd', Double.valueOf(1.0d), true));
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromCollection(asList).sum(3).groupBy(new int[]{2, 2}).window(Time.of(10L, new MyTimestamp(), 0L)).every(Time.of(4L, new MyTimestamp(), 0L)).maxBy(3).flatten().map(new MyMapFunction2()).flatMap(new MyFlatMapFunction()).connect(executionEnvironment.addSource(new PojoSource())).map(new MyCoMapFunction()).writeAsText(this.resultPath1, FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
    }

    @Test
    public void complexIntegrationTest3() throws Exception {
        this.expected1 = "541\n1223\n3319\n5851\n1987\n8387\n15907\n10939\n4127\n2477\n6737\n13421\n4987\n4999\n18451\n9283\n7499\n16937\n11927\n9973\n14431\n19507\n12497\n17497\n14983\n19997\n";
        for (int i = 2; i < 100; i++) {
            this.expected2 += "(" + i + "," + (20000 / i) + ")\n";
        }
        for (int i2 = 19901; i2 < 20000; i2++) {
            this.expected2 += "(" + i2 + "," + (20000 / i2) + ")\n";
        }
        this.expected2 += "(20000,1)";
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setBufferTimeout(0L);
        DataStreamSource generateParallelSequence = executionEnvironment.generateParallelSequence(1L, 10000L);
        DataStreamSource generateParallelSequence2 = executionEnvironment.generateParallelSequence(10001L, 20000L);
        generateParallelSequence.filter(new PrimeFilterFunction()).window(Count.of(100)).max(0).flatten().union(new DataStream[]{generateParallelSequence2.filter(new PrimeFilterFunction()).window(Count.of(100)).max(0).flatten()}).writeAsText(this.resultPath1, FileSystem.WriteMode.OVERWRITE);
        generateParallelSequence.flatMap(new DivisorsFlatMapFunction()).union(new DataStream[]{generateParallelSequence2.flatMap(new DivisorsFlatMapFunction())}).map(new MapFunction<Long, Tuple2<Long, Integer>>() { // from class: org.apache.flink.streaming.api.complex.ComplexIntegrationTest.3
            public Tuple2<Long, Integer> map(Long l) throws Exception {
                return new Tuple2<>(l, 1);
            }
        }).groupBy(new int[]{0}).window(Count.of(10000)).sum(1).flatten().filter(new FilterFunction<Tuple2<Long, Integer>>() { // from class: org.apache.flink.streaming.api.complex.ComplexIntegrationTest.2
            public boolean filter(Tuple2<Long, Integer> tuple2) throws Exception {
                return ((Long) tuple2.f0).longValue() < 100 || ((Long) tuple2.f0).longValue() > 19900;
            }
        }).writeAsText(this.resultPath2, FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
    }

    @Test
    public void complexIntegrationTest4() throws Exception {
        this.expected1 = "((100,100),0)\n((120,122),5)\n((121,125),6)\n((138,144),9)\n((139,147),10)\n((156,166),13)\n((157,169),14)\n((174,188),17)\n((175,191),18)\n((192,210),21)\n((193,213),22)\n((210,232),25)\n((211,235),26)\n((228,254),29)\n((229,257),30)\n((246,276),33)\n((247,279),34)\n((264,298),37)\n((265,301),38)\n((282,320),41)\n((283,323),42)\n((300,342),45)\n((301,345),46)\n((318,364),49)\n((319,367),50)\n((336,386),53)\n((337,389),54)\n((354,408),57)\n((355,411),58)\n((372,430),61)\n((373,433),62)\n((390,452),65)\n((391,455),66)\n((408,474),69)\n((409,477),70)\n((426,496),73)\n((427,499),74)\n((444,518),77)\n((445,521),78)\n((462,540),81)\n((463,543),82)\n((480,562),85)\n((481,565),86)\n((498,584),89)\n((499,587),90)\n((516,606),93)\n((517,609),94)\n((534,628),97)\n((535,631),98)";
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.addSource(new RectangleSource()).global().map(new RectangleMapFunction()).window(Delta.of(0.0d, new MyDelta(), new Tuple2(new Rectangle(100, 100), 0))).mapWindow(new MyWindowMapFunction()).flatten().writeAsText(this.resultPath1, FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
    }

    @Test
    public void complexIntegrationTest5() throws Exception {
        this.expected1 = "1\n2\n2\n3\n3\n3\n4\n4\n4\n4\n5\n5\n5\n5\n5\n1\n3\n3\n4\n5\n5\n6\n8\n9\n10\n12\n15\n16\n20\n25\n";
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setBufferTimeout(0L);
        SingleOutputStreamOperator flatMap = executionEnvironment.generateParallelSequence(1L, 5L).map(new MapFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.complex.ComplexIntegrationTest.5
            public Long map(Long l) throws Exception {
                return l;
            }
        }).startNewChain().filter(new FilterFunction<Long>() { // from class: org.apache.flink.streaming.api.complex.ComplexIntegrationTest.4
            public boolean filter(Long l) throws Exception {
                return true;
            }
        }).disableChaining().flatMap(new SquareFlatMapFunction());
        flatMap.map(new MapFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.complex.ComplexIntegrationTest.8
            public Long map(Long l) throws Exception {
                return l;
            }
        }).union(new DataStream[]{flatMap.fold(0L, new FoldFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.complex.ComplexIntegrationTest.7
            public Long fold(Long l, Long l2) throws Exception {
                return Long.valueOf(l.longValue() + l2.longValue());
            }
        }).map(new MapFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.complex.ComplexIntegrationTest.6
            public Long map(Long l) throws Exception {
                return l;
            }
        }).disableChaining()}).writeAsText(this.resultPath1, FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
    }

    @Test
    public void complexIntegrationTest6() throws Exception {
        this.expected1 = "(6,(a,6))\n(6,(b,3))\n(6,(c,4))\n(6,(d,2))\n(6,(f,2))\n(7,(a,1))\n(7,(b,2))\n(7,(c,3))\n(7,(d,1))\n(7,(e,1))\n(7,(f,1))\n(8,(a,6))\n(8,(b,4))\n(8,(c,5))\n(8,(d,1))\n(8,(e,2))\n(8,(f,2))\n(9,(a,4))\n(9,(b,4))\n(9,(c,7))\n(9,(d,3))\n(9,(e,1))\n(9,(f,2))\n(10,(a,3))\n(10,(b,2))\n(10,(c,3))\n(10,(d,2))\n(10,(e,1))\n(10,(f,1))";
        this.expected2 = "[a, a, c, c, d, f]\n[a, b, b, d]\n[a, a, a, b, c, c, f]\n[a, d, e]\n[b, b, c, c, c, f]\n[a, a, a, a, b, b, c, c, e]\n[a, a, b, b, c, c, c, d, e, f, f]\n[a, a, a, b, c, c, c, d, d, f]\n[a, b, b, b, c, c, c, c, d, e, f]\n[a, a, a, b, b, c, c, c, d, d, e, f]";
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd-MM-yyyy");
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        hashMap.put('a', 2);
        hashMap.put('c', 2);
        hashMap.put('d', 1);
        hashMap.put('f', 1);
        arrayList.add(new Tuple2(simpleDateFormat.parse("03-06-2014"), hashMap));
        HashMap hashMap2 = new HashMap();
        hashMap2.put('a', 1);
        hashMap2.put('b', 2);
        hashMap2.put('d', 1);
        arrayList.add(new Tuple2(simpleDateFormat.parse("10-06-2014"), hashMap2));
        HashMap hashMap3 = new HashMap();
        hashMap3.put('a', 3);
        hashMap3.put('b', 1);
        hashMap3.put('c', 2);
        hashMap3.put('f', 1);
        arrayList.add(new Tuple2(simpleDateFormat.parse("29-06-2014"), hashMap3));
        HashMap hashMap4 = new HashMap();
        hashMap4.put('a', 1);
        hashMap4.put('d', 1);
        hashMap4.put('e', 1);
        arrayList.add(new Tuple2(simpleDateFormat.parse("15-07-2014"), hashMap4));
        HashMap hashMap5 = new HashMap();
        hashMap5.put('b', 2);
        hashMap5.put('c', 3);
        hashMap5.put('f', 1);
        arrayList.add(new Tuple2(simpleDateFormat.parse("24-07-2014"), hashMap5));
        HashMap hashMap6 = new HashMap();
        hashMap6.put('a', 4);
        hashMap6.put('b', 2);
        hashMap6.put('c', 2);
        hashMap6.put('e', 1);
        arrayList.add(new Tuple2(simpleDateFormat.parse("17-08-2014"), hashMap6));
        HashMap hashMap7 = new HashMap();
        hashMap7.put('a', 2);
        hashMap7.put('b', 2);
        hashMap7.put('c', 3);
        hashMap7.put('d', 1);
        hashMap7.put('e', 1);
        hashMap7.put('f', 2);
        arrayList.add(new Tuple2(simpleDateFormat.parse("27-08-2014"), hashMap7));
        HashMap hashMap8 = new HashMap();
        hashMap8.put('a', 3);
        hashMap8.put('b', 1);
        hashMap8.put('c', 3);
        hashMap8.put('d', 2);
        hashMap8.put('f', 1);
        arrayList.add(new Tuple2(simpleDateFormat.parse("16-09-2014"), hashMap8));
        HashMap hashMap9 = new HashMap();
        hashMap9.put('a', 1);
        hashMap9.put('b', 3);
        hashMap9.put('c', 4);
        hashMap9.put('d', 1);
        hashMap9.put('e', 1);
        hashMap9.put('f', 1);
        arrayList.add(new Tuple2(simpleDateFormat.parse("25-09-2014"), hashMap9));
        HashMap hashMap10 = new HashMap();
        hashMap10.put('a', 3);
        hashMap10.put('b', 2);
        hashMap10.put('c', 3);
        hashMap10.put('d', 2);
        hashMap10.put('e', 1);
        hashMap10.put('f', 1);
        arrayList.add(new Tuple2(simpleDateFormat.parse("01-10-2014"), hashMap10));
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource fromCollection = executionEnvironment.fromCollection(arrayList);
        fromCollection.window(Time.of(1L, new Timestamp6())).reduceWindow(new SalesReduceFunction()).flatten().flatMap(new FlatMapFunction6()).writeAsText(this.resultPath1, FileSystem.WriteMode.OVERWRITE);
        fromCollection.map(new MapFunction6()).writeAsText(this.resultPath2, FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
    }
}
