package org.apache.flink.streaming.api;

import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.util.NoOpSink;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/DataStreamTest.class */
public class DataStreamTest extends StreamingMultipleProgramsTestBase {

    /* loaded from: input_file:org/apache/flink/streaming/api/DataStreamTest$CustomPOJO.class */
    public static class CustomPOJO {
        private String s;
        private int i;

        public void setS(String str) {
            this.s = str;
        }

        public void setI(int i) {
            this.i = i;
        }

        public String getS() {
            return this.s;
        }

        public int getI() {
            return this.i;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/DataStreamTest$FirstSelector.class */
    private static class FirstSelector implements KeySelector<Tuple2<Long, Long>, Long> {
        private static final long serialVersionUID = 1;

        private FirstSelector() {
        }

        public Long getKey(Tuple2<Long, Long> tuple2) throws Exception {
            return (Long) tuple2.f0;
        }
    }

    @Test
    public void testUnion() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(4);
        DataStream map = executionEnvironment.generateSequence(0L, 0L).map(new MapFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.1
            public Long map(Long l) throws Exception {
                return null;
            }
        });
        SingleOutputStreamOperator map2 = map.union(new DataStream[]{map}).map(new MapFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.2
            public Long map(Long l) throws Exception {
                return null;
            }
        });
        DataStream map3 = executionEnvironment.generateSequence(0L, 0L).map(new MapFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.3
            public Long map(Long l) throws Exception {
                return null;
            }
        });
        SingleOutputStreamOperator map4 = map3.broadcast().union(new DataStream[]{map3}).map(new MapFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.4
            public Long map(Long l) throws Exception {
                return null;
            }
        });
        SingleOutputStreamOperator parallelism = executionEnvironment.generateSequence(0L, 0L).map(new MapFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.5
            public Long map(Long l) throws Exception {
                return null;
            }
        }).setParallelism(4);
        DataStream parallelism2 = executionEnvironment.generateSequence(0L, 0L).map(new MapFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.6
            public Long map(Long l) throws Exception {
                return null;
            }
        }).setParallelism(2);
        SingleOutputStreamOperator parallelism3 = parallelism.union(new DataStream[]{parallelism2}).map(new MapFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.7
            public Long map(Long l) throws Exception {
                return null;
            }
        }).setParallelism(4);
        SingleOutputStreamOperator parallelism4 = executionEnvironment.generateSequence(0L, 0L).map(new MapFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.8
            public Long map(Long l) throws Exception {
                return null;
            }
        }).setParallelism(2);
        DataStream parallelism5 = executionEnvironment.generateSequence(0L, 0L).map(new MapFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.9
            public Long map(Long l) throws Exception {
                return null;
            }
        }).setParallelism(4);
        SingleOutputStreamOperator parallelism6 = parallelism4.broadcast().union(new DataStream[]{parallelism5}).map(new MapFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.10
            public Long map(Long l) throws Exception {
                return null;
            }
        }).setParallelism(4);
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        Assert.assertTrue(streamGraph.getStreamNode(map2.getId()).getInEdges().size() == 2);
        Iterator it = streamGraph.getStreamNode(map2.getId()).getInEdges().iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((StreamEdge) it.next()).getPartitioner() instanceof ForwardPartitioner);
        }
        Assert.assertTrue(streamGraph.getStreamNode(map4.getId()).getInEdges().size() == 2);
        boolean z = false;
        boolean z2 = false;
        for (StreamEdge streamEdge : streamGraph.getStreamNode(map4.getId()).getInEdges()) {
            if (streamEdge.getPartitioner() instanceof ForwardPartitioner) {
                z = true;
            }
            if (streamEdge.getPartitioner() instanceof BroadcastPartitioner) {
                z2 = true;
            }
        }
        Assert.assertTrue(z && z2);
        Assert.assertTrue(streamGraph.getStreamNode(parallelism3.getId()).getInEdges().size() == 2);
        for (StreamEdge streamEdge2 : streamGraph.getStreamNode(parallelism3.getId()).getInEdges()) {
            if (streamEdge2.getSourceId() == parallelism.getId().intValue()) {
                Assert.assertTrue(streamEdge2.getPartitioner() instanceof ForwardPartitioner);
            } else if (streamEdge2.getSourceId() == parallelism2.getId().intValue()) {
                Assert.assertTrue(streamEdge2.getPartitioner() instanceof RebalancePartitioner);
            } else {
                Assert.fail("Wrong input edge.");
            }
        }
        Assert.assertTrue(streamGraph.getStreamNode(parallelism6.getId()).getInEdges().size() == 2);
        for (StreamEdge streamEdge3 : streamGraph.getStreamNode(parallelism6.getId()).getInEdges()) {
            if (streamEdge3.getSourceId() == parallelism4.getId().intValue()) {
                Assert.assertTrue(streamEdge3.getPartitioner() instanceof BroadcastPartitioner);
            } else if (streamEdge3.getSourceId() == parallelism5.getId().intValue()) {
                Assert.assertTrue(streamEdge3.getPartitioner() instanceof ForwardPartitioner);
            } else {
                Assert.fail("Wrong input edge.");
            }
        }
    }

    @Test
    public void testNaming() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.generateSequence(0L, 0L).name("testSource1").map(new MapFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.11
            public Long map(Long l) throws Exception {
                return null;
            }
        }).name("testMap").connect(executionEnvironment.generateSequence(0L, 0L).name("testSource2").map(new MapFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.12
            public Long map(Long l) throws Exception {
                return null;
            }
        }).name("testMap")).flatMap(new CoFlatMapFunction<Long, Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.14
            private static final long serialVersionUID = 1;

            public void flatMap1(Long l, Collector<Long> collector) throws Exception {
            }

            public void flatMap2(Long l, Collector<Long> collector) throws Exception {
            }

            public /* bridge */ /* synthetic */ void flatMap2(Object obj, Collector collector) throws Exception {
                flatMap2((Long) obj, (Collector<Long>) collector);
            }

            public /* bridge */ /* synthetic */ void flatMap1(Object obj, Collector collector) throws Exception {
                flatMap1((Long) obj, (Collector<Long>) collector);
            }
        }).name("testCoFlatMap").windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(10L))).fold(0L, new FoldFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.13
            private static final long serialVersionUID = 1;

            public Long fold(Long l, Long l2) throws Exception {
                return null;
            }
        }).name("testWindowFold").print();
        String executionPlan = executionEnvironment.getExecutionPlan();
        Assert.assertTrue(executionPlan.contains("testSource1"));
        Assert.assertTrue(executionPlan.contains("testSource2"));
        Assert.assertTrue(executionPlan.contains("testMap"));
        Assert.assertTrue(executionPlan.contains("testMap"));
        Assert.assertTrue(executionPlan.contains("testCoFlatMap"));
        Assert.assertTrue(executionPlan.contains("testWindowFold"));
    }

    @Test
    public void testPartitioning() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource fromElements = executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)});
        DataStreamSource fromElements2 = executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)});
        ConnectedStreams connect = fromElements.connect(fromElements2);
        KeyedStream keyBy = fromElements.keyBy(new int[]{0});
        KeyedStream keyBy2 = fromElements.keyBy(new int[]{1, 0});
        KeyedStream keyBy3 = fromElements.keyBy(new String[]{"f0"});
        KeyedStream keyBy4 = fromElements.keyBy(new FirstSelector());
        int intValue = createDownStreamId((DataStream<?>) keyBy).intValue();
        int intValue2 = createDownStreamId((DataStream<?>) keyBy2).intValue();
        int intValue3 = createDownStreamId((DataStream<?>) keyBy3).intValue();
        int intValue4 = createDownStreamId((DataStream<?>) keyBy4).intValue();
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), intValue)));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), intValue2)));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), intValue3)));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), intValue4)));
        Assert.assertTrue(isKeyed((DataStream<?>) keyBy));
        Assert.assertTrue(isKeyed((DataStream<?>) keyBy2));
        Assert.assertTrue(isKeyed((DataStream<?>) keyBy3));
        Assert.assertTrue(isKeyed((DataStream<?>) keyBy4));
        DataStream partitionByHash = fromElements.partitionByHash(new int[]{0});
        DataStream partitionByHash2 = fromElements.partitionByHash(new int[]{1, 0});
        DataStream partitionByHash3 = fromElements.partitionByHash(new String[]{"f0"});
        DataStream partitionByHash4 = fromElements.partitionByHash(new FirstSelector());
        int intValue5 = createDownStreamId((DataStream<?>) partitionByHash).intValue();
        int intValue6 = createDownStreamId((DataStream<?>) partitionByHash2).intValue();
        int intValue7 = createDownStreamId((DataStream<?>) partitionByHash3).intValue();
        int intValue8 = createDownStreamId((DataStream<?>) partitionByHash4).intValue();
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), intValue5)));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), intValue6)));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), intValue7)));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), intValue8)));
        Assert.assertFalse(isKeyed((DataStream<?>) partitionByHash));
        Assert.assertFalse(isKeyed((DataStream<?>) partitionByHash3));
        Assert.assertFalse(isKeyed((DataStream<?>) partitionByHash2));
        Assert.assertFalse(isKeyed((DataStream<?>) partitionByHash4));
        Partitioner<Long> partitioner = new Partitioner<Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.15
            public int partition(Long l, int i) {
                return 100;
            }
        };
        DataStream partitionCustom = fromElements.partitionCustom(partitioner, 0);
        DataStream partitionCustom2 = fromElements.partitionCustom(partitioner, "f0");
        DataStream partitionCustom3 = fromElements.partitionCustom(partitioner, new FirstSelector());
        int intValue9 = createDownStreamId((DataStream<?>) partitionCustom).intValue();
        int intValue10 = createDownStreamId((DataStream<?>) partitionCustom2).intValue();
        int intValue11 = createDownStreamId((DataStream<?>) partitionCustom3).intValue();
        Assert.assertTrue(isCustomPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), intValue9)));
        Assert.assertTrue(isCustomPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), intValue10)));
        Assert.assertTrue(isCustomPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), intValue11)));
        Assert.assertFalse(isKeyed((DataStream<?>) partitionCustom));
        Assert.assertFalse(isKeyed((DataStream<?>) partitionCustom2));
        Assert.assertFalse(isKeyed((DataStream<?>) partitionCustom3));
        ConnectedStreams keyBy5 = connect.keyBy(0, 0);
        Integer createDownStreamId = createDownStreamId(keyBy5);
        ConnectedStreams keyBy6 = connect.keyBy(new int[]{0}, new int[]{0});
        Integer createDownStreamId2 = createDownStreamId(keyBy6);
        ConnectedStreams keyBy7 = connect.keyBy("f0", "f0");
        Integer createDownStreamId3 = createDownStreamId(keyBy7);
        ConnectedStreams keyBy8 = connect.keyBy(new String[]{"f0"}, new String[]{"f0"});
        Integer createDownStreamId4 = createDownStreamId(keyBy8);
        ConnectedStreams keyBy9 = connect.keyBy(new FirstSelector(), new FirstSelector());
        Integer createDownStreamId5 = createDownStreamId(keyBy9);
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), createDownStreamId.intValue())));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements2.getId().intValue(), createDownStreamId.intValue())));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), createDownStreamId2.intValue())));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements2.getId().intValue(), createDownStreamId2.intValue())));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), createDownStreamId3.intValue())));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements2.getId().intValue(), createDownStreamId3.intValue())));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), createDownStreamId4.intValue())));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements2.getId().intValue(), createDownStreamId4.intValue())));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), createDownStreamId5.intValue())));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements2.getId().intValue(), createDownStreamId5.intValue())));
        Assert.assertTrue(isKeyed((ConnectedStreams<?, ?>) keyBy5));
        Assert.assertTrue(isKeyed((ConnectedStreams<?, ?>) keyBy6));
        Assert.assertTrue(isKeyed((ConnectedStreams<?, ?>) keyBy7));
        Assert.assertTrue(isKeyed((ConnectedStreams<?, ?>) keyBy8));
        Assert.assertTrue(isKeyed((ConnectedStreams<?, ?>) keyBy9));
        ConnectedStreams partitionByHash5 = connect.partitionByHash(0, 0);
        Integer createDownStreamId6 = createDownStreamId(partitionByHash5);
        ConnectedStreams partitionByHash6 = connect.partitionByHash(new int[]{0}, new int[]{0});
        Integer createDownStreamId7 = createDownStreamId(partitionByHash6);
        ConnectedStreams partitionByHash7 = connect.partitionByHash("f0", "f0");
        Integer createDownStreamId8 = createDownStreamId(partitionByHash7);
        ConnectedStreams partitionByHash8 = connect.partitionByHash(new String[]{"f0"}, new String[]{"f0"});
        Integer createDownStreamId9 = createDownStreamId(partitionByHash8);
        ConnectedStreams partitionByHash9 = connect.partitionByHash(new FirstSelector(), new FirstSelector());
        Integer createDownStreamId10 = createDownStreamId(partitionByHash9);
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), createDownStreamId6.intValue())));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements2.getId().intValue(), createDownStreamId6.intValue())));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), createDownStreamId7.intValue())));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements2.getId().intValue(), createDownStreamId7.intValue())));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), createDownStreamId8.intValue())));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements2.getId().intValue(), createDownStreamId8.intValue())));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), createDownStreamId9.intValue())));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements2.getId().intValue(), createDownStreamId9.intValue())));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements.getId().intValue(), createDownStreamId10.intValue())));
        Assert.assertTrue(isPartitioned(executionEnvironment.getStreamGraph().getStreamEdges(fromElements2.getId().intValue(), createDownStreamId10.intValue())));
        Assert.assertFalse(isKeyed((ConnectedStreams<?, ?>) partitionByHash5));
        Assert.assertFalse(isKeyed((ConnectedStreams<?, ?>) partitionByHash6));
        Assert.assertFalse(isKeyed((ConnectedStreams<?, ?>) partitionByHash7));
        Assert.assertFalse(isKeyed((ConnectedStreams<?, ?>) partitionByHash8));
        Assert.assertFalse(isKeyed((ConnectedStreams<?, ?>) partitionByHash9));
    }

    @Test
    public void testParallelism() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource fromElements = executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)});
        executionEnvironment.setParallelism(10);
        SingleOutputStreamOperator name = fromElements.map(new MapFunction<Tuple2<Long, Long>, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.16
            public Long map(Tuple2<Long, Long> tuple2) throws Exception {
                return null;
            }
        }).name("MyMap");
        name.windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(10L))).fold(0L, new FoldFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.17
            public Long fold(Long l, Long l2) throws Exception {
                return null;
            }
        }).addSink(new NoOpSink());
        DataStreamSink addSink = name.addSink(new SinkFunction<Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.18
            private static final long serialVersionUID = 1;

            public void invoke(Long l) throws Exception {
            }
        });
        Assert.assertEquals(1L, executionEnvironment.getStreamGraph().getStreamNode(fromElements.getId()).getParallelism());
        Assert.assertEquals(10L, executionEnvironment.getStreamGraph().getStreamNode(name.getId()).getParallelism());
        Assert.assertEquals(1L, executionEnvironment.getStreamGraph().getStreamNode(r0.getId()).getParallelism());
        Assert.assertEquals(10L, executionEnvironment.getStreamGraph().getStreamNode(Integer.valueOf(addSink.getTransformation().getId())).getParallelism());
        executionEnvironment.setParallelism(7);
        Assert.assertEquals(1L, executionEnvironment.getStreamGraph().getStreamNode(fromElements.getId()).getParallelism());
        Assert.assertEquals(10L, executionEnvironment.getStreamGraph().getStreamNode(name.getId()).getParallelism());
        Assert.assertEquals(1L, executionEnvironment.getStreamGraph().getStreamNode(r0.getId()).getParallelism());
        Assert.assertEquals(10L, executionEnvironment.getStreamGraph().getStreamNode(Integer.valueOf(addSink.getTransformation().getId())).getParallelism());
        try {
            fromElements.setParallelism(3);
            Assert.fail();
        } catch (IllegalArgumentException e) {
        }
        DataStreamSource generateSequence = executionEnvironment.generateSequence(0L, 0L);
        generateSequence.addSink(new NoOpSink());
        Assert.assertEquals(7L, executionEnvironment.getStreamGraph().getStreamNode(generateSequence.getId()).getParallelism());
        generateSequence.setParallelism(3);
        Assert.assertEquals(3L, executionEnvironment.getStreamGraph().getStreamNode(generateSequence.getId()).getParallelism());
        name.setParallelism(2);
        Assert.assertEquals(2L, executionEnvironment.getStreamGraph().getStreamNode(name.getId()).getParallelism());
        addSink.setParallelism(4);
        Assert.assertEquals(4L, executionEnvironment.getStreamGraph().getStreamNode(Integer.valueOf(addSink.getTransformation().getId())).getParallelism());
    }

    @Test
    public void testTypeInfo() {
        DataStreamSource generateSequence = StreamExecutionEnvironment.getExecutionEnvironment().generateSequence(0L, 0L);
        Assert.assertEquals(TypeExtractor.getForClass(Long.class), generateSequence.getType());
        SingleOutputStreamOperator map = generateSequence.map(new MapFunction<Long, Tuple2<Integer, String>>() { // from class: org.apache.flink.streaming.api.DataStreamTest.19
            public Tuple2<Integer, String> map(Long l) throws Exception {
                return null;
            }
        });
        Assert.assertEquals(TypeExtractor.getForObject(new Tuple2(0, "")), map.getType());
        SingleOutputStreamOperator apply = map.windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(5L))).apply(new AllWindowFunction<Tuple2<Integer, String>, String, GlobalWindow>() { // from class: org.apache.flink.streaming.api.DataStreamTest.20
            public void apply(GlobalWindow globalWindow, Iterable<Tuple2<Integer, String>> iterable, Collector<String> collector) throws Exception {
            }

            public /* bridge */ /* synthetic */ void apply(Window window, Iterable iterable, Collector collector) throws Exception {
                apply((GlobalWindow) window, (Iterable<Tuple2<Integer, String>>) iterable, (Collector<String>) collector);
            }
        });
        Assert.assertEquals(TypeExtractor.getForClass(String.class), apply.getType());
        Assert.assertEquals(TypeExtractor.getForClass(CustomPOJO.class), apply.windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(5L))).fold(new CustomPOJO(), new FoldFunction<String, CustomPOJO>() { // from class: org.apache.flink.streaming.api.DataStreamTest.21
            private static final long serialVersionUID = 1;

            public CustomPOJO fold(CustomPOJO customPOJO, String str) throws Exception {
                return null;
            }
        }).getType());
    }

    @Test
    public void operatorTest() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource generateSequence = executionEnvironment.generateSequence(0L, 0L);
        MapFunction<Long, Integer> mapFunction = new MapFunction<Long, Integer>() { // from class: org.apache.flink.streaming.api.DataStreamTest.22
            public Integer map(Long l) throws Exception {
                return null;
            }
        };
        SingleOutputStreamOperator map = generateSequence.map(mapFunction);
        map.addSink(new NoOpSink());
        Assert.assertEquals(mapFunction, getFunctionForDataStream(map));
        FlatMapFunction<Long, Integer> flatMapFunction = new FlatMapFunction<Long, Integer>() { // from class: org.apache.flink.streaming.api.DataStreamTest.23
            private static final long serialVersionUID = 1;

            public void flatMap(Long l, Collector<Integer> collector) throws Exception {
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Long) obj, (Collector<Integer>) collector);
            }
        };
        DataStream flatMap = generateSequence.flatMap(flatMapFunction);
        flatMap.addSink(new NoOpSink());
        Assert.assertEquals(flatMapFunction, getFunctionForDataStream(flatMap));
        FilterFunction<Integer> filterFunction = new FilterFunction<Integer>() { // from class: org.apache.flink.streaming.api.DataStreamTest.24
            public boolean filter(Integer num) throws Exception {
                return false;
            }
        };
        SingleOutputStreamOperator filter = map.union(new DataStream[]{flatMap}).filter(filterFunction);
        filter.addSink(new NoOpSink());
        Assert.assertEquals(filterFunction, getFunctionForDataStream(filter));
        try {
            executionEnvironment.getStreamGraph().getStreamEdges(map.getId().intValue(), filter.getId().intValue());
        } catch (RuntimeException e) {
            Assert.fail(e.getMessage());
        }
        try {
            executionEnvironment.getStreamGraph().getStreamEdges(flatMap.getId().intValue(), filter.getId().intValue());
        } catch (RuntimeException e2) {
            Assert.fail(e2.getMessage());
        }
        OutputSelector<Integer> outputSelector = new OutputSelector<Integer>() { // from class: org.apache.flink.streaming.api.DataStreamTest.25
            public Iterable<String> select(Integer num) {
                return null;
            }
        };
        SplitStream split = filter.split(outputSelector);
        split.select(new String[]{"dummy"}).addSink(new NoOpSink());
        List outputSelectors = executionEnvironment.getStreamGraph().getStreamNode(filter.getId()).getOutputSelectors();
        Assert.assertEquals(1L, outputSelectors.size());
        Assert.assertEquals(outputSelector, outputSelectors.get(0));
        Assert.assertEquals("a", ((StreamEdge) executionEnvironment.getStreamGraph().getStreamEdges(filter.getId().intValue(), split.select(new String[]{"a"}).print().getTransformation().getId()).get(0)).getSelectedNames().get(0));
        ConnectedStreams connect = map.connect(flatMap);
        CoMapFunction<Integer, Integer, String> coMapFunction = new CoMapFunction<Integer, Integer, String>() { // from class: org.apache.flink.streaming.api.DataStreamTest.26
            private static final long serialVersionUID = 1;

            public String map1(Integer num) {
                return null;
            }

            public String map2(Integer num) {
                return null;
            }
        };
        SingleOutputStreamOperator map2 = connect.map(coMapFunction);
        map2.addSink(new NoOpSink());
        Assert.assertEquals(coMapFunction, getFunctionForDataStream(map2));
        try {
            executionEnvironment.getStreamGraph().getStreamEdges(map.getId().intValue(), map2.getId().intValue());
        } catch (RuntimeException e3) {
            Assert.fail(e3.getMessage());
        }
        try {
            executionEnvironment.getStreamGraph().getStreamEdges(flatMap.getId().intValue(), map2.getId().intValue());
        } catch (RuntimeException e4) {
            Assert.fail(e4.getMessage());
        }
    }

    @Test
    public void sinkKeyTest() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSink print = executionEnvironment.generateSequence(1L, 100L).print();
        Assert.assertTrue(executionEnvironment.getStreamGraph().getStreamNode(Integer.valueOf(print.getTransformation().getId())).getStatePartitioner() == null);
        Assert.assertTrue(((StreamEdge) executionEnvironment.getStreamGraph().getStreamNode(Integer.valueOf(print.getTransformation().getId())).getInEdges().get(0)).getPartitioner() instanceof ForwardPartitioner);
        KeySelector<Long, Long> keySelector = new KeySelector<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.27
            private static final long serialVersionUID = 1;

            public Long getKey(Long l) throws Exception {
                return 0L;
            }
        };
        DataStreamSink print2 = executionEnvironment.generateSequence(1L, 100L).keyBy(keySelector).print();
        Assert.assertNotNull(executionEnvironment.getStreamGraph().getStreamNode(Integer.valueOf(print2.getTransformation().getId())).getStatePartitioner());
        Assert.assertNotNull(executionEnvironment.getStreamGraph().getStreamNode(Integer.valueOf(print2.getTransformation().getId())).getStateKeySerializer());
        Assert.assertNotNull(executionEnvironment.getStreamGraph().getStreamNode(Integer.valueOf(print2.getTransformation().getId())).getStateKeySerializer());
        Assert.assertEquals(keySelector, executionEnvironment.getStreamGraph().getStreamNode(Integer.valueOf(print2.getTransformation().getId())).getStatePartitioner());
        Assert.assertTrue(((StreamEdge) executionEnvironment.getStreamGraph().getStreamNode(Integer.valueOf(print2.getTransformation().getId())).getInEdges().get(0)).getPartitioner() instanceof HashPartitioner);
        KeySelector<Long, Long> keySelector2 = new KeySelector<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.28
            private static final long serialVersionUID = 1;

            public Long getKey(Long l) throws Exception {
                return 0L;
            }
        };
        DataStreamSink print3 = executionEnvironment.generateSequence(1L, 100L).keyBy(keySelector2).print();
        Assert.assertTrue(executionEnvironment.getStreamGraph().getStreamNode(Integer.valueOf(print3.getTransformation().getId())).getStatePartitioner() != null);
        Assert.assertEquals(keySelector2, executionEnvironment.getStreamGraph().getStreamNode(Integer.valueOf(print3.getTransformation().getId())).getStatePartitioner());
        Assert.assertTrue(((StreamEdge) executionEnvironment.getStreamGraph().getStreamNode(Integer.valueOf(print3.getTransformation().getId())).getInEdges().get(0)).getPartitioner() instanceof HashPartitioner);
    }

    @Test
    public void testChannelSelectors() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource generateSequence = executionEnvironment.generateSequence(0L, 0L);
        Assert.assertTrue(((StreamEdge) executionEnvironment.getStreamGraph().getStreamEdges(generateSequence.getId().intValue(), generateSequence.broadcast().print().getTransformation().getId()).get(0)).getPartitioner() instanceof BroadcastPartitioner);
        Assert.assertTrue(((StreamEdge) executionEnvironment.getStreamGraph().getStreamEdges(generateSequence.getId().intValue(), generateSequence.shuffle().print().getTransformation().getId()).get(0)).getPartitioner() instanceof ShufflePartitioner);
        Assert.assertTrue(((StreamEdge) executionEnvironment.getStreamGraph().getStreamEdges(generateSequence.getId().intValue(), generateSequence.forward().print().getTransformation().getId()).get(0)).getPartitioner() instanceof ForwardPartitioner);
        Assert.assertTrue(((StreamEdge) executionEnvironment.getStreamGraph().getStreamEdges(generateSequence.getId().intValue(), generateSequence.rebalance().print().getTransformation().getId()).get(0)).getPartitioner() instanceof RebalancePartitioner);
        Assert.assertTrue(((StreamEdge) executionEnvironment.getStreamGraph().getStreamEdges(generateSequence.getId().intValue(), generateSequence.global().print().getTransformation().getId()).get(0)).getPartitioner() instanceof GlobalPartitioner);
    }

    private static StreamOperator<?> getOperatorForDataStream(DataStream<?> dataStream) {
        return dataStream.getExecutionEnvironment().getStreamGraph().getStreamNode(dataStream.getId()).getOperator();
    }

    private static Function getFunctionForDataStream(DataStream<?> dataStream) {
        return getOperatorForDataStream(dataStream).getUserFunction();
    }

    private static Integer createDownStreamId(DataStream<?> dataStream) {
        return Integer.valueOf(dataStream.print().getTransformation().getId());
    }

    private static boolean isKeyed(DataStream<?> dataStream) {
        return dataStream instanceof KeyedStream;
    }

    private static Integer createDownStreamId(ConnectedStreams connectedStreams) {
        SingleOutputStreamOperator map = connectedStreams.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() { // from class: org.apache.flink.streaming.api.DataStreamTest.29
            private static final long serialVersionUID = 1;

            public Object map1(Tuple2<Long, Long> tuple2) {
                return null;
            }

            public Object map2(Tuple2<Long, Long> tuple2) {
                return null;
            }
        });
        map.addSink(new NoOpSink());
        return map.getId();
    }

    private static boolean isKeyed(ConnectedStreams<?, ?> connectedStreams) {
        return (connectedStreams.getFirstInput() instanceof KeyedStream) && (connectedStreams.getSecondInput() instanceof KeyedStream);
    }

    private static boolean isPartitioned(List<StreamEdge> list) {
        boolean z = true;
        Iterator<StreamEdge> it = list.iterator();
        while (it.hasNext()) {
            if (!(it.next().getPartitioner() instanceof HashPartitioner)) {
                z = false;
            }
        }
        return z;
    }

    private static boolean isCustomPartitioned(List<StreamEdge> list) {
        boolean z = true;
        Iterator<StreamEdge> it = list.iterator();
        while (it.hasNext()) {
            if (!(it.next().getPartitioner() instanceof CustomPartitionerWrapper)) {
                z = false;
            }
        }
        return z;
    }
}
