package org.apache.flink.streaming.api;

import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.ConnectedDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.DiscretizedStream;
import org.apache.flink.streaming.api.datastream.GroupedDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.functions.WindowMapFunction;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/DataStreamTest.class */
public class DataStreamTest {
    private static final long MEMORYSIZE = 32;
    private static int PARALLELISM = 1;

    /* loaded from: input_file:org/apache/flink/streaming/api/DataStreamTest$CustomPOJO.class */
    public static class CustomPOJO {
        private String s;
        private int i;

        public void setS(String str) {
            this.s = str;
        }

        public void setI(int i) {
            this.i = i;
        }

        public String getS() {
            return this.s;
        }

        public int getI() {
            return this.i;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/DataStreamTest$FirstSelector.class */
    private static class FirstSelector implements KeySelector<Tuple2<Long, Long>, Long> {
        private FirstSelector() {
        }

        public Long getKey(Tuple2<Long, Long> tuple2) throws Exception {
            return (Long) tuple2.f0;
        }
    }

    @Test
    public void testNaming() throws Exception {
        TestStreamEnvironment testStreamEnvironment = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
        testStreamEnvironment.generateSequence(0L, 0L).name("testSource1").map(new MapFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.1
            public Long map(Long l) throws Exception {
                return null;
            }
        }).name("testMap").connect(testStreamEnvironment.generateSequence(0L, 0L).name("testSource2").reduce(new ReduceFunction<Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.2
            public Long reduce(Long l, Long l2) throws Exception {
                return null;
            }
        }).name("testReduce")).flatMap(new CoFlatMapFunction<Long, Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.4
            public void flatMap1(Long l, Collector<Long> collector) throws Exception {
            }

            public void flatMap2(Long l, Collector<Long> collector) throws Exception {
            }

            public /* bridge */ /* synthetic */ void flatMap2(Object obj, Collector collector) throws Exception {
                flatMap2((Long) obj, (Collector<Long>) collector);
            }

            public /* bridge */ /* synthetic */ void flatMap1(Object obj, Collector collector) throws Exception {
                flatMap1((Long) obj, (Collector<Long>) collector);
            }
        }).name("testCoFlatMap").window(Count.of(10)).foldWindow(0L, new FoldFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.3
            public Long fold(Long l, Long l2) throws Exception {
                return null;
            }
        }).name("testWindowFold").flatten();
        String executionPlan = testStreamEnvironment.getExecutionPlan();
        Assert.assertTrue(executionPlan.contains("testSource1"));
        Assert.assertTrue(executionPlan.contains("testSource2"));
        Assert.assertTrue(executionPlan.contains("testMap"));
        Assert.assertTrue(executionPlan.contains("testReduce"));
        Assert.assertTrue(executionPlan.contains("testCoFlatMap"));
        Assert.assertTrue(executionPlan.contains("testWindowFold"));
    }

    @Test
    public void testPartitioning() {
        TestStreamEnvironment testStreamEnvironment = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
        StreamGraph streamGraph = testStreamEnvironment.getStreamGraph();
        DataStreamSource fromElements = testStreamEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)});
        ConnectedDataStream connect = fromElements.connect(testStreamEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)}));
        GroupedDataStream groupBy = fromElements.groupBy(new int[]{0});
        GroupedDataStream groupBy2 = fromElements.groupBy(new int[]{1, 0});
        GroupedDataStream groupBy3 = fromElements.groupBy(new String[]{"f0"});
        GroupedDataStream groupBy4 = fromElements.groupBy(new FirstSelector());
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(groupBy.getId().intValue(), createDownStreamId((DataStream) groupBy).intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(groupBy2.getId().intValue(), createDownStreamId((DataStream) groupBy2).intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(groupBy3.getId().intValue(), createDownStreamId((DataStream) groupBy3).intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(groupBy4.getId().intValue(), createDownStreamId((DataStream) groupBy4).intValue())));
        Assert.assertTrue(isGrouped((DataStream) groupBy));
        Assert.assertTrue(isGrouped((DataStream) groupBy2));
        Assert.assertTrue(isGrouped((DataStream) groupBy3));
        Assert.assertTrue(isGrouped((DataStream) groupBy4));
        DataStream partitionByHash = fromElements.partitionByHash(new int[]{0});
        DataStream partitionByHash2 = fromElements.partitionByHash(new int[]{1, 0});
        DataStream partitionByHash3 = fromElements.partitionByHash(new String[]{"f0"});
        DataStream partitionByHash4 = fromElements.partitionByHash(new FirstSelector());
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(partitionByHash.getId().intValue(), createDownStreamId(partitionByHash).intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(partitionByHash2.getId().intValue(), createDownStreamId(partitionByHash2).intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(partitionByHash3.getId().intValue(), createDownStreamId(partitionByHash3).intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(partitionByHash4.getId().intValue(), createDownStreamId(partitionByHash4).intValue())));
        Assert.assertFalse(isGrouped(partitionByHash));
        Assert.assertFalse(isGrouped(partitionByHash3));
        Assert.assertFalse(isGrouped(partitionByHash2));
        Assert.assertFalse(isGrouped(partitionByHash4));
        ConnectedDataStream groupBy5 = connect.groupBy(0, 0);
        Integer createDownStreamId = createDownStreamId(groupBy5);
        ConnectedDataStream groupBy6 = connect.groupBy(new int[]{0}, new int[]{0});
        Integer createDownStreamId2 = createDownStreamId(groupBy6);
        ConnectedDataStream groupBy7 = connect.groupBy("f0", "f0");
        Integer createDownStreamId3 = createDownStreamId(groupBy7);
        ConnectedDataStream groupBy8 = connect.groupBy(new String[]{"f0"}, new String[]{"f0"});
        Integer createDownStreamId4 = createDownStreamId(groupBy8);
        ConnectedDataStream groupBy9 = connect.groupBy(new FirstSelector(), new FirstSelector());
        Integer createDownStreamId5 = createDownStreamId(groupBy9);
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(groupBy5.getFirst().getId().intValue(), createDownStreamId.intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(groupBy5.getSecond().getId().intValue(), createDownStreamId.intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(groupBy6.getFirst().getId().intValue(), createDownStreamId2.intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(groupBy6.getSecond().getId().intValue(), createDownStreamId2.intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(groupBy7.getFirst().getId().intValue(), createDownStreamId3.intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(groupBy7.getSecond().getId().intValue(), createDownStreamId3.intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(groupBy8.getFirst().getId().intValue(), createDownStreamId4.intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(groupBy8.getSecond().getId().intValue(), createDownStreamId4.intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(groupBy9.getFirst().getId().intValue(), createDownStreamId5.intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(groupBy9.getSecond().getId().intValue(), createDownStreamId5.intValue())));
        Assert.assertTrue(isGrouped(groupBy5));
        Assert.assertTrue(isGrouped(groupBy6));
        Assert.assertTrue(isGrouped(groupBy7));
        Assert.assertTrue(isGrouped(groupBy8));
        Assert.assertTrue(isGrouped(groupBy9));
        ConnectedDataStream partitionByHash5 = connect.partitionByHash(0, 0);
        Integer createDownStreamId6 = createDownStreamId(partitionByHash5);
        ConnectedDataStream partitionByHash6 = connect.partitionByHash(new int[]{0}, new int[]{0});
        Integer createDownStreamId7 = createDownStreamId(partitionByHash6);
        ConnectedDataStream partitionByHash7 = connect.partitionByHash("f0", "f0");
        Integer createDownStreamId8 = createDownStreamId(partitionByHash7);
        ConnectedDataStream partitionByHash8 = connect.partitionByHash(new String[]{"f0"}, new String[]{"f0"});
        Integer createDownStreamId9 = createDownStreamId(partitionByHash8);
        ConnectedDataStream partitionByHash9 = connect.partitionByHash(new FirstSelector(), new FirstSelector());
        Integer createDownStreamId10 = createDownStreamId(partitionByHash9);
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(partitionByHash5.getFirst().getId().intValue(), createDownStreamId6.intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(partitionByHash5.getSecond().getId().intValue(), createDownStreamId6.intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(partitionByHash6.getFirst().getId().intValue(), createDownStreamId7.intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(partitionByHash6.getSecond().getId().intValue(), createDownStreamId7.intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(partitionByHash7.getFirst().getId().intValue(), createDownStreamId8.intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(partitionByHash7.getSecond().getId().intValue(), createDownStreamId8.intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(partitionByHash8.getFirst().getId().intValue(), createDownStreamId9.intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(partitionByHash8.getSecond().getId().intValue(), createDownStreamId9.intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(partitionByHash9.getFirst().getId().intValue(), createDownStreamId10.intValue())));
        Assert.assertTrue(isPartitioned(streamGraph.getStreamEdge(partitionByHash9.getSecond().getId().intValue(), createDownStreamId10.intValue())));
        Assert.assertFalse(isGrouped(partitionByHash5));
        Assert.assertFalse(isGrouped(partitionByHash6));
        Assert.assertFalse(isGrouped(partitionByHash7));
        Assert.assertFalse(isGrouped(partitionByHash8));
        Assert.assertFalse(isGrouped(partitionByHash9));
    }

    @Test
    public void testParallelism() {
        TestStreamEnvironment testStreamEnvironment = new TestStreamEnvironment(10, MEMORYSIZE);
        StreamGraph streamGraph = testStreamEnvironment.getStreamGraph();
        DataStreamSource fromElements = testStreamEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)});
        SingleOutputStreamOperator map = fromElements.map(new MapFunction<Tuple2<Long, Long>, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.5
            public Long map(Tuple2<Long, Long> tuple2) throws Exception {
                return null;
            }
        });
        DataStream flatten = map.window(Count.of(10)).foldWindow(0L, new FoldFunction<Long, Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.6
            public Long fold(Long l, Long l2) throws Exception {
                return null;
            }
        }).flatten();
        DataStreamSink addSink = map.addSink(new SinkFunction<Long>() { // from class: org.apache.flink.streaming.api.DataStreamTest.7
            public void invoke(Long l) throws Exception {
            }
        });
        Assert.assertEquals(1L, streamGraph.getStreamNode(fromElements.getId()).getParallelism());
        Assert.assertEquals(10L, streamGraph.getStreamNode(map.getId()).getParallelism());
        Assert.assertEquals(10L, streamGraph.getStreamNode(flatten.getId()).getParallelism());
        Assert.assertEquals(10L, streamGraph.getStreamNode(addSink.getId()).getParallelism());
        testStreamEnvironment.setParallelism(7);
        Assert.assertEquals(1L, streamGraph.getStreamNode(fromElements.getId()).getParallelism());
        Assert.assertEquals(7L, streamGraph.getStreamNode(map.getId()).getParallelism());
        Assert.assertEquals(7L, streamGraph.getStreamNode(flatten.getId()).getParallelism());
        Assert.assertEquals(7L, streamGraph.getStreamNode(addSink.getId()).getParallelism());
        try {
            fromElements.setParallelism(3);
            Assert.fail();
        } catch (IllegalArgumentException e) {
        }
        DataStreamSource generateParallelSequence = testStreamEnvironment.generateParallelSequence(0L, 0L);
        Assert.assertEquals(7L, streamGraph.getStreamNode(generateParallelSequence.getId()).getParallelism());
        generateParallelSequence.setParallelism(3);
        Assert.assertEquals(3L, streamGraph.getStreamNode(generateParallelSequence.getId()).getParallelism());
        map.setParallelism(2);
        Assert.assertEquals(2L, streamGraph.getStreamNode(map.getId()).getParallelism());
        addSink.setParallelism(4);
        Assert.assertEquals(4L, streamGraph.getStreamNode(addSink.getId()).getParallelism());
    }

    @Test
    public void testTypeInfo() {
        DataStreamSource generateSequence = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE).generateSequence(0L, 0L);
        Assert.assertEquals(TypeExtractor.getForClass(Long.class), generateSequence.getType());
        SingleOutputStreamOperator map = generateSequence.map(new MapFunction<Long, Tuple2<Integer, String>>() { // from class: org.apache.flink.streaming.api.DataStreamTest.8
            public Tuple2<Integer, String> map(Long l) throws Exception {
                return null;
            }
        });
        Assert.assertEquals(TypeExtractor.getForObject(new Tuple2(0, "")), map.getType());
        DiscretizedStream mapWindow = map.window(Count.of(5)).mapWindow(new WindowMapFunction<Tuple2<Integer, String>, String>() { // from class: org.apache.flink.streaming.api.DataStreamTest.9
            public void mapWindow(Iterable<Tuple2<Integer, String>> iterable, Collector<String> collector) throws Exception {
            }
        });
        Assert.assertEquals(TypeExtractor.getForClass(String.class), mapWindow.getType());
        Assert.assertEquals(TypeExtractor.getForClass(CustomPOJO.class), mapWindow.foldWindow(new CustomPOJO(), new FoldFunction<String, CustomPOJO>() { // from class: org.apache.flink.streaming.api.DataStreamTest.10
            public CustomPOJO fold(CustomPOJO customPOJO, String str) throws Exception {
                return null;
            }
        }).flatten().getType());
    }

    @Test
    public void operatorTest() {
        TestStreamEnvironment testStreamEnvironment = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
        StreamGraph streamGraph = testStreamEnvironment.getStreamGraph();
        DataStreamSource generateSequence = testStreamEnvironment.generateSequence(0L, 0L);
        MapFunction<Long, Integer> mapFunction = new MapFunction<Long, Integer>() { // from class: org.apache.flink.streaming.api.DataStreamTest.11
            public Integer map(Long l) throws Exception {
                return null;
            }
        };
        SingleOutputStreamOperator map = generateSequence.map(mapFunction);
        Assert.assertEquals(mapFunction, getFunctionForDataStream(map));
        FlatMapFunction<Long, Integer> flatMapFunction = new FlatMapFunction<Long, Integer>() { // from class: org.apache.flink.streaming.api.DataStreamTest.12
            public void flatMap(Long l, Collector<Integer> collector) throws Exception {
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Long) obj, (Collector<Integer>) collector);
            }
        };
        DataStream flatMap = generateSequence.flatMap(flatMapFunction);
        Assert.assertEquals(flatMapFunction, getFunctionForDataStream(flatMap));
        FilterFunction<Integer> filterFunction = new FilterFunction<Integer>() { // from class: org.apache.flink.streaming.api.DataStreamTest.13
            public boolean filter(Integer num) throws Exception {
                return false;
            }
        };
        SingleOutputStreamOperator filter = map.union(new DataStream[]{flatMap}).filter(filterFunction);
        Assert.assertEquals(filterFunction, getFunctionForDataStream(filter));
        try {
            streamGraph.getStreamEdge(map.getId().intValue(), filter.getId().intValue());
        } catch (RuntimeException e) {
            Assert.fail(e.getMessage());
        }
        try {
            streamGraph.getStreamEdge(flatMap.getId().intValue(), filter.getId().intValue());
        } catch (RuntimeException e2) {
            Assert.fail(e2.getMessage());
        }
        OutputSelector<Integer> outputSelector = new OutputSelector<Integer>() { // from class: org.apache.flink.streaming.api.DataStreamTest.14
            public Iterable<String> select(Integer num) {
                return null;
            }
        };
        SplitDataStream split = filter.split(outputSelector);
        List outputSelectors = streamGraph.getStreamNode(split.getId()).getOutputSelectors();
        Assert.assertEquals(1L, outputSelectors.size());
        Assert.assertEquals(outputSelector, outputSelectors.get(0));
        DataStream select = split.select(new String[]{"a"});
        Assert.assertEquals("a", streamGraph.getStreamEdge(select.getId().intValue(), select.print().getId().intValue()).getSelectedNames().get(0));
        FoldFunction<Integer, String> foldFunction = new FoldFunction<Integer, String>() { // from class: org.apache.flink.streaming.api.DataStreamTest.15
            public String fold(String str, Integer num) throws Exception {
                return null;
            }
        };
        SingleOutputStreamOperator fold = map.fold("", foldFunction);
        Assert.assertEquals(foldFunction, getFunctionForDataStream(fold));
        ConnectedDataStream connect = fold.connect(flatMap);
        CoMapFunction<String, Integer, String> coMapFunction = new CoMapFunction<String, Integer, String>() { // from class: org.apache.flink.streaming.api.DataStreamTest.16
            public String map1(String str) {
                return null;
            }

            public String map2(Integer num) {
                return null;
            }
        };
        SingleOutputStreamOperator map2 = connect.map(coMapFunction);
        Assert.assertEquals(coMapFunction, getFunctionForDataStream(map2));
        try {
            streamGraph.getStreamEdge(fold.getId().intValue(), map2.getId().intValue());
        } catch (RuntimeException e3) {
            Assert.fail(e3.getMessage());
        }
        try {
            streamGraph.getStreamEdge(flatMap.getId().intValue(), map2.getId().intValue());
        } catch (RuntimeException e4) {
            Assert.fail(e4.getMessage());
        }
    }

    @Test
    public void testChannelSelectors() {
        TestStreamEnvironment testStreamEnvironment = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
        StreamGraph streamGraph = testStreamEnvironment.getStreamGraph();
        DataStreamSource generateSequence = testStreamEnvironment.generateSequence(0L, 0L);
        SingleOutputStreamOperator broadcast = generateSequence.broadcast();
        Assert.assertTrue(streamGraph.getStreamEdge(broadcast.getId().intValue(), broadcast.print().getId().intValue()).getPartitioner() instanceof BroadcastPartitioner);
        SingleOutputStreamOperator shuffle = generateSequence.shuffle();
        Assert.assertTrue(streamGraph.getStreamEdge(shuffle.getId().intValue(), shuffle.print().getId().intValue()).getPartitioner() instanceof ShufflePartitioner);
        SingleOutputStreamOperator forward = generateSequence.forward();
        Assert.assertTrue(streamGraph.getStreamEdge(forward.getId().intValue(), forward.print().getId().intValue()).getPartitioner() instanceof RebalancePartitioner);
        SingleOutputStreamOperator rebalance = generateSequence.rebalance();
        Assert.assertTrue(streamGraph.getStreamEdge(rebalance.getId().intValue(), rebalance.print().getId().intValue()).getPartitioner() instanceof RebalancePartitioner);
        SingleOutputStreamOperator global = generateSequence.global();
        Assert.assertTrue(streamGraph.getStreamEdge(global.getId().intValue(), global.print().getId().intValue()).getPartitioner() instanceof GlobalPartitioner);
    }

    private static StreamOperator<?> getOperatorForDataStream(DataStream<?> dataStream) {
        return dataStream.getExecutionEnvironment().getStreamGraph().getStreamNode(dataStream.getId()).getOperator();
    }

    private static Function getFunctionForDataStream(DataStream<?> dataStream) {
        return getOperatorForDataStream(dataStream).getUserFunction();
    }

    private static Integer createDownStreamId(DataStream dataStream) {
        return dataStream.print().getId();
    }

    private static boolean isGrouped(DataStream dataStream) {
        return dataStream instanceof GroupedDataStream;
    }

    private static Integer createDownStreamId(ConnectedDataStream connectedDataStream) {
        return connectedDataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() { // from class: org.apache.flink.streaming.api.DataStreamTest.17
            public Object map1(Tuple2<Long, Long> tuple2) {
                return null;
            }

            public Object map2(Tuple2<Long, Long> tuple2) {
                return null;
            }
        }).getId();
    }

    private static boolean isGrouped(ConnectedDataStream connectedDataStream) {
        return (connectedDataStream.getFirst() instanceof GroupedDataStream) && (connectedDataStream.getSecond() instanceof GroupedDataStream);
    }

    private static boolean isPartitioned(StreamEdge streamEdge) {
        return streamEdge.getPartitioner() instanceof FieldsPartitioner;
    }
}
