/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api;

import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.core.StringStartsWith;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class DataStreamTest
extends TestLogger {
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Test
    public void testUnion() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        SingleOutputStreamOperator input1 = env.generateSequence(0L, 0L).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        });
        SingleOutputStreamOperator selfUnion = input1.union(new DataStream[]{input1}).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        });
        SingleOutputStreamOperator input6 = env.generateSequence(0L, 0L).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        });
        SingleOutputStreamOperator selfUnionDifferentPartition = input6.broadcast().union(new DataStream[]{input6}).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        });
        SingleOutputStreamOperator input2 = env.generateSequence(0L, 0L).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        }).setParallelism(4);
        SingleOutputStreamOperator input3 = env.generateSequence(0L, 0L).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        }).setParallelism(2);
        SingleOutputStreamOperator unionDifferingParallelism = input2.union(new DataStream[]{input3}).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        }).setParallelism(4);
        SingleOutputStreamOperator input4 = env.generateSequence(0L, 0L).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        }).setParallelism(2);
        SingleOutputStreamOperator input5 = env.generateSequence(0L, 0L).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        }).setParallelism(4);
        SingleOutputStreamOperator unionDifferingPartitioning = input4.broadcast().union(new DataStream[]{input5}).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        }).setParallelism(4);
        StreamGraph streamGraph = env.getStreamGraph();
        Assert.assertTrue((streamGraph.getStreamNode(Integer.valueOf(selfUnion.getId())).getInEdges().size() == 2 ? 1 : 0) != 0);
        for (StreamEdge edge : streamGraph.getStreamNode(Integer.valueOf(selfUnion.getId())).getInEdges()) {
            Assert.assertTrue((boolean)(edge.getPartitioner() instanceof ForwardPartitioner));
        }
        Assert.assertTrue((streamGraph.getStreamNode(Integer.valueOf(selfUnionDifferentPartition.getId())).getInEdges().size() == 2 ? 1 : 0) != 0);
        boolean hasForward = false;
        boolean hasBroadcast = false;
        for (StreamEdge edge : streamGraph.getStreamNode(Integer.valueOf(selfUnionDifferentPartition.getId())).getInEdges()) {
            if (edge.getPartitioner() instanceof ForwardPartitioner) {
                hasForward = true;
            }
            if (!(edge.getPartitioner() instanceof BroadcastPartitioner)) continue;
            hasBroadcast = true;
        }
        Assert.assertTrue((hasForward && hasBroadcast ? 1 : 0) != 0);
        Assert.assertTrue((streamGraph.getStreamNode(Integer.valueOf(unionDifferingParallelism.getId())).getInEdges().size() == 2 ? 1 : 0) != 0);
        for (StreamEdge edge : streamGraph.getStreamNode(Integer.valueOf(unionDifferingParallelism.getId())).getInEdges()) {
            if (edge.getSourceId() == input2.getId()) {
                Assert.assertTrue((boolean)(edge.getPartitioner() instanceof ForwardPartitioner));
                continue;
            }
            if (edge.getSourceId() == input3.getId()) {
                Assert.assertTrue((boolean)(edge.getPartitioner() instanceof RebalancePartitioner));
                continue;
            }
            Assert.fail((String)"Wrong input edge.");
        }
        Assert.assertTrue((streamGraph.getStreamNode(Integer.valueOf(unionDifferingPartitioning.getId())).getInEdges().size() == 2 ? 1 : 0) != 0);
        for (StreamEdge edge : streamGraph.getStreamNode(Integer.valueOf(unionDifferingPartitioning.getId())).getInEdges()) {
            if (edge.getSourceId() == input4.getId()) {
                Assert.assertTrue((boolean)(edge.getPartitioner() instanceof BroadcastPartitioner));
                continue;
            }
            if (edge.getSourceId() == input5.getId()) {
                Assert.assertTrue((boolean)(edge.getPartitioner() instanceof ForwardPartitioner));
                continue;
            }
            Assert.fail((String)"Wrong input edge.");
        }
    }

    @Test
    public void testNaming() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator dataStream1 = env.generateSequence(0L, 0L).name("testSource1").map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        }).name("testMap");
        SingleOutputStreamOperator dataStream2 = env.generateSequence(0L, 0L).name("testSource2").map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        }).name("testMap");
        dataStream1.connect((DataStream)dataStream2).flatMap((CoFlatMapFunction)new CoFlatMapFunction<Long, Long, Long>(){

            public void flatMap1(Long value, Collector<Long> out) throws Exception {
            }

            public void flatMap2(Long value, Collector<Long> out) throws Exception {
            }
        }).name("testCoFlatMap").windowAll((WindowAssigner)GlobalWindows.create()).trigger((Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)10L))).fold((Object)0L, (FoldFunction)new FoldFunction<Long, Long>(){
            private static final long serialVersionUID = 1L;

            public Long fold(Long accumulator, Long value) throws Exception {
                return null;
            }
        }).name("testWindowFold").print();
        String plan = env.getExecutionPlan();
        Assert.assertTrue((boolean)plan.contains("testSource1"));
        Assert.assertTrue((boolean)plan.contains("testSource2"));
        Assert.assertTrue((boolean)plan.contains("testMap"));
        Assert.assertTrue((boolean)plan.contains("testMap"));
        Assert.assertTrue((boolean)plan.contains("testCoFlatMap"));
        Assert.assertTrue((boolean)plan.contains("testWindowFold"));
    }

    @Test
    public void testPartitioning() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src1 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        DataStreamSource src2 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        ConnectedStreams connected = src1.connect((DataStream)src2);
        KeyedStream group1 = src1.keyBy(new int[]{0});
        KeyedStream group2 = src1.keyBy(new int[]{1, 0});
        KeyedStream group3 = src1.keyBy(new String[]{"f0"});
        KeyedStream group4 = src1.keyBy((KeySelector)new FirstSelector());
        int id1 = DataStreamTest.createDownStreamId(group1);
        int id2 = DataStreamTest.createDownStreamId(group2);
        int id3 = DataStreamTest.createDownStreamId(group3);
        int id4 = DataStreamTest.createDownStreamId(group4);
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), id1)));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), id2)));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), id3)));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), id4)));
        Assert.assertTrue((boolean)DataStreamTest.isKeyed(group1));
        Assert.assertTrue((boolean)DataStreamTest.isKeyed(group2));
        Assert.assertTrue((boolean)DataStreamTest.isKeyed(group3));
        Assert.assertTrue((boolean)DataStreamTest.isKeyed(group4));
        KeyedStream partition1 = src1.keyBy(new int[]{0});
        KeyedStream partition2 = src1.keyBy(new int[]{1, 0});
        KeyedStream partition3 = src1.keyBy(new String[]{"f0"});
        KeyedStream partition4 = src1.keyBy((KeySelector)new FirstSelector());
        int pid1 = DataStreamTest.createDownStreamId(partition1);
        int pid2 = DataStreamTest.createDownStreamId(partition2);
        int pid3 = DataStreamTest.createDownStreamId(partition3);
        int pid4 = DataStreamTest.createDownStreamId(partition4);
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), pid1)));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), pid2)));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), pid3)));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), pid4)));
        Assert.assertTrue((boolean)DataStreamTest.isKeyed(partition1));
        Assert.assertTrue((boolean)DataStreamTest.isKeyed(partition3));
        Assert.assertTrue((boolean)DataStreamTest.isKeyed(partition2));
        Assert.assertTrue((boolean)DataStreamTest.isKeyed(partition4));
        Partitioner<Long> longPartitioner = new Partitioner<Long>(){

            public int partition(Long key, int numPartitions) {
                return 100;
            }
        };
        DataStream customPartition1 = src1.partitionCustom((Partitioner)longPartitioner, 0);
        DataStream customPartition3 = src1.partitionCustom((Partitioner)longPartitioner, "f0");
        DataStream customPartition4 = src1.partitionCustom((Partitioner)longPartitioner, (KeySelector)new FirstSelector());
        int cid1 = DataStreamTest.createDownStreamId(customPartition1);
        int cid2 = DataStreamTest.createDownStreamId(customPartition3);
        int cid3 = DataStreamTest.createDownStreamId(customPartition4);
        Assert.assertTrue((boolean)DataStreamTest.isCustomPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), cid1)));
        Assert.assertTrue((boolean)DataStreamTest.isCustomPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), cid2)));
        Assert.assertTrue((boolean)DataStreamTest.isCustomPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), cid3)));
        Assert.assertFalse((boolean)DataStreamTest.isKeyed(customPartition1));
        Assert.assertFalse((boolean)DataStreamTest.isKeyed(customPartition3));
        Assert.assertFalse((boolean)DataStreamTest.isKeyed(customPartition4));
        ConnectedStreams connectedGroup1 = connected.keyBy(0, 0);
        Integer downStreamId1 = DataStreamTest.createDownStreamId(connectedGroup1);
        ConnectedStreams connectedGroup2 = connected.keyBy(new int[]{0}, new int[]{0});
        Integer downStreamId2 = DataStreamTest.createDownStreamId(connectedGroup2);
        ConnectedStreams connectedGroup3 = connected.keyBy("f0", "f0");
        Integer downStreamId3 = DataStreamTest.createDownStreamId(connectedGroup3);
        ConnectedStreams connectedGroup4 = connected.keyBy(new String[]{"f0"}, new String[]{"f0"});
        Integer downStreamId4 = DataStreamTest.createDownStreamId(connectedGroup4);
        ConnectedStreams connectedGroup5 = connected.keyBy((KeySelector)new FirstSelector(), (KeySelector)new FirstSelector());
        Integer downStreamId5 = DataStreamTest.createDownStreamId(connectedGroup5);
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), downStreamId1.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src2.getId(), downStreamId1.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), downStreamId2.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src2.getId(), downStreamId2.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), downStreamId3.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src2.getId(), downStreamId3.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), downStreamId4.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src2.getId(), downStreamId4.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), downStreamId5.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src2.getId(), downStreamId5.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isKeyed(connectedGroup1));
        Assert.assertTrue((boolean)DataStreamTest.isKeyed(connectedGroup2));
        Assert.assertTrue((boolean)DataStreamTest.isKeyed(connectedGroup3));
        Assert.assertTrue((boolean)DataStreamTest.isKeyed(connectedGroup4));
        Assert.assertTrue((boolean)DataStreamTest.isKeyed(connectedGroup5));
        ConnectedStreams connectedPartition1 = connected.keyBy(0, 0);
        Integer connectDownStreamId1 = DataStreamTest.createDownStreamId(connectedPartition1);
        ConnectedStreams connectedPartition2 = connected.keyBy(new int[]{0}, new int[]{0});
        Integer connectDownStreamId2 = DataStreamTest.createDownStreamId(connectedPartition2);
        ConnectedStreams connectedPartition3 = connected.keyBy("f0", "f0");
        Integer connectDownStreamId3 = DataStreamTest.createDownStreamId(connectedPartition3);
        ConnectedStreams connectedPartition4 = connected.keyBy(new String[]{"f0"}, new String[]{"f0"});
        Integer connectDownStreamId4 = DataStreamTest.createDownStreamId(connectedPartition4);
        ConnectedStreams connectedPartition5 = connected.keyBy((KeySelector)new FirstSelector(), (KeySelector)new FirstSelector());
        Integer connectDownStreamId5 = DataStreamTest.createDownStreamId(connectedPartition5);
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), connectDownStreamId1.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src2.getId(), connectDownStreamId1.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), connectDownStreamId2.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src2.getId(), connectDownStreamId2.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), connectDownStreamId3.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src2.getId(), connectDownStreamId3.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), connectDownStreamId4.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src2.getId(), connectDownStreamId4.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), connectDownStreamId5.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isPartitioned(env.getStreamGraph().getStreamEdges(src2.getId(), connectDownStreamId5.intValue())));
        Assert.assertTrue((boolean)DataStreamTest.isKeyed(connectedPartition1));
        Assert.assertTrue((boolean)DataStreamTest.isKeyed(connectedPartition2));
        Assert.assertTrue((boolean)DataStreamTest.isKeyed(connectedPartition3));
        Assert.assertTrue((boolean)DataStreamTest.isKeyed(connectedPartition4));
        Assert.assertTrue((boolean)DataStreamTest.isKeyed(connectedPartition5));
    }

    @Test
    public void testParallelism() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        env.setParallelism(10);
        SingleOutputStreamOperator map = src.map((MapFunction)new MapFunction<Tuple2<Long, Long>, Long>(){

            public Long map(Tuple2<Long, Long> value) throws Exception {
                return null;
            }
        }).name("MyMap");
        SingleOutputStreamOperator windowed = map.windowAll((WindowAssigner)GlobalWindows.create()).trigger((Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)10L))).fold((Object)0L, (FoldFunction)new FoldFunction<Long, Long>(){

            public Long fold(Long accumulator, Long value) throws Exception {
                return null;
            }
        });
        windowed.addSink((SinkFunction)new DiscardingSink());
        DataStreamSink sink = map.addSink((SinkFunction)new SinkFunction<Long>(){
            private static final long serialVersionUID = 1L;

            public void invoke(Long value) throws Exception {
            }
        });
        Assert.assertEquals((long)1L, (long)env.getStreamGraph().getStreamNode(Integer.valueOf(src.getId())).getParallelism());
        Assert.assertEquals((long)10L, (long)env.getStreamGraph().getStreamNode(Integer.valueOf(map.getId())).getParallelism());
        Assert.assertEquals((long)1L, (long)env.getStreamGraph().getStreamNode(Integer.valueOf(windowed.getId())).getParallelism());
        Assert.assertEquals((long)10L, (long)env.getStreamGraph().getStreamNode(Integer.valueOf(sink.getTransformation().getId())).getParallelism());
        env.setParallelism(7);
        Assert.assertEquals((long)1L, (long)env.getStreamGraph().getStreamNode(Integer.valueOf(src.getId())).getParallelism());
        Assert.assertEquals((long)10L, (long)env.getStreamGraph().getStreamNode(Integer.valueOf(map.getId())).getParallelism());
        Assert.assertEquals((long)1L, (long)env.getStreamGraph().getStreamNode(Integer.valueOf(windowed.getId())).getParallelism());
        Assert.assertEquals((long)10L, (long)env.getStreamGraph().getStreamNode(Integer.valueOf(sink.getTransformation().getId())).getParallelism());
        try {
            src.setParallelism(3);
            Assert.fail();
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        DataStreamSource parallelSource = env.generateSequence(0L, 0L);
        parallelSource.addSink((SinkFunction)new DiscardingSink());
        Assert.assertEquals((long)7L, (long)env.getStreamGraph().getStreamNode(Integer.valueOf(parallelSource.getId())).getParallelism());
        parallelSource.setParallelism(3);
        Assert.assertEquals((long)3L, (long)env.getStreamGraph().getStreamNode(Integer.valueOf(parallelSource.getId())).getParallelism());
        map.setParallelism(2);
        Assert.assertEquals((long)2L, (long)env.getStreamGraph().getStreamNode(Integer.valueOf(map.getId())).getParallelism());
        sink.setParallelism(4);
        Assert.assertEquals((long)4L, (long)env.getStreamGraph().getStreamNode(Integer.valueOf(sink.getTransformation().getId())).getParallelism());
    }

    @Test
    public void testResources() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ResourceSpec minResource1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
        ResourceSpec preferredResource1 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(200).build();
        ResourceSpec minResource2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(200).build();
        ResourceSpec preferredResource2 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(300).build();
        ResourceSpec minResource3 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(300).build();
        ResourceSpec preferredResource3 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(400).build();
        ResourceSpec minResource4 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(400).build();
        ResourceSpec preferredResource4 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(500).build();
        ResourceSpec minResource5 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(500).build();
        ResourceSpec preferredResource5 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(600).build();
        ResourceSpec minResource6 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(600).build();
        ResourceSpec preferredResource6 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(700).build();
        ResourceSpec minResource7 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(700).build();
        ResourceSpec preferredResource7 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(800).build();
        Method opMethod = SingleOutputStreamOperator.class.getDeclaredMethod("setResources", ResourceSpec.class, ResourceSpec.class);
        opMethod.setAccessible(true);
        Method sinkMethod = DataStreamSink.class.getDeclaredMethod("setResources", ResourceSpec.class, ResourceSpec.class);
        sinkMethod.setAccessible(true);
        DataStreamSource source1 = env.generateSequence(0L, 0L);
        opMethod.invoke((Object)source1, minResource1, preferredResource1);
        SingleOutputStreamOperator map1 = source1.map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        });
        opMethod.invoke((Object)map1, minResource2, preferredResource2);
        DataStreamSource source2 = env.generateSequence(0L, 0L);
        opMethod.invoke((Object)source2, minResource3, preferredResource3);
        SingleOutputStreamOperator map2 = source2.map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        });
        opMethod.invoke((Object)map2, minResource4, preferredResource4);
        SingleOutputStreamOperator connected = map1.connect((DataStream)map2).flatMap((CoFlatMapFunction)new CoFlatMapFunction<Long, Long, Long>(){

            public void flatMap1(Long value, Collector<Long> out) throws Exception {
            }

            public void flatMap2(Long value, Collector<Long> out) throws Exception {
            }
        });
        opMethod.invoke((Object)connected, minResource5, preferredResource5);
        SingleOutputStreamOperator windowed = connected.windowAll((WindowAssigner)GlobalWindows.create()).trigger((Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)10L))).fold((Object)0L, (FoldFunction)new FoldFunction<Long, Long>(){
            private static final long serialVersionUID = 1L;

            public Long fold(Long accumulator, Long value) throws Exception {
                return null;
            }
        });
        opMethod.invoke((Object)windowed, minResource6, preferredResource6);
        DataStreamSink sink = windowed.print();
        sinkMethod.invoke((Object)sink, minResource7, preferredResource7);
        Assert.assertEquals((Object)minResource1, (Object)env.getStreamGraph().getStreamNode(Integer.valueOf(source1.getId())).getMinResources());
        Assert.assertEquals((Object)preferredResource1, (Object)env.getStreamGraph().getStreamNode(Integer.valueOf(source1.getId())).getPreferredResources());
        Assert.assertEquals((Object)minResource2, (Object)env.getStreamGraph().getStreamNode(Integer.valueOf(map1.getId())).getMinResources());
        Assert.assertEquals((Object)preferredResource2, (Object)env.getStreamGraph().getStreamNode(Integer.valueOf(map1.getId())).getPreferredResources());
        Assert.assertEquals((Object)minResource3, (Object)env.getStreamGraph().getStreamNode(Integer.valueOf(source2.getId())).getMinResources());
        Assert.assertEquals((Object)preferredResource3, (Object)env.getStreamGraph().getStreamNode(Integer.valueOf(source2.getId())).getPreferredResources());
        Assert.assertEquals((Object)minResource4, (Object)env.getStreamGraph().getStreamNode(Integer.valueOf(map2.getId())).getMinResources());
        Assert.assertEquals((Object)preferredResource4, (Object)env.getStreamGraph().getStreamNode(Integer.valueOf(map2.getId())).getPreferredResources());
        Assert.assertEquals((Object)minResource5, (Object)env.getStreamGraph().getStreamNode(Integer.valueOf(connected.getId())).getMinResources());
        Assert.assertEquals((Object)preferredResource5, (Object)env.getStreamGraph().getStreamNode(Integer.valueOf(connected.getId())).getPreferredResources());
        Assert.assertEquals((Object)minResource6, (Object)env.getStreamGraph().getStreamNode(Integer.valueOf(windowed.getId())).getMinResources());
        Assert.assertEquals((Object)preferredResource6, (Object)env.getStreamGraph().getStreamNode(Integer.valueOf(windowed.getId())).getPreferredResources());
        Assert.assertEquals((Object)minResource7, (Object)env.getStreamGraph().getStreamNode(Integer.valueOf(sink.getTransformation().getId())).getMinResources());
        Assert.assertEquals((Object)preferredResource7, (Object)env.getStreamGraph().getStreamNode(Integer.valueOf(sink.getTransformation().getId())).getPreferredResources());
    }

    @Test
    public void testTypeInfo() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src1 = env.generateSequence(0L, 0L);
        Assert.assertEquals((Object)TypeExtractor.getForClass(Long.class), (Object)src1.getType());
        SingleOutputStreamOperator map = src1.map((MapFunction)new MapFunction<Long, Tuple2<Integer, String>>(){

            public Tuple2<Integer, String> map(Long value) throws Exception {
                return null;
            }
        });
        Assert.assertEquals((Object)TypeExtractor.getForObject((Object)new Tuple2((Object)0, (Object)"")), (Object)map.getType());
        SingleOutputStreamOperator window = map.windowAll((WindowAssigner)GlobalWindows.create()).trigger((Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)5L))).apply((AllWindowFunction)new AllWindowFunction<Tuple2<Integer, String>, String, GlobalWindow>(){

            public void apply(GlobalWindow window, Iterable<Tuple2<Integer, String>> values, Collector<String> out) throws Exception {
            }
        });
        Assert.assertEquals((Object)TypeExtractor.getForClass(String.class), (Object)window.getType());
        SingleOutputStreamOperator flatten = window.windowAll((WindowAssigner)GlobalWindows.create()).trigger((Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)5L))).fold((Object)new CustomPOJO(), (FoldFunction)new FoldFunction<String, CustomPOJO>(){
            private static final long serialVersionUID = 1L;

            public CustomPOJO fold(CustomPOJO accumulator, String value) throws Exception {
                return null;
            }
        });
        Assert.assertEquals((Object)TypeExtractor.getForClass(CustomPOJO.class), (Object)flatten.getType());
    }

    @Test
    @Deprecated
    public void testKeyedStreamProcessTranslation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.generateSequence(0L, 0L);
        ProcessFunction<Long, Integer> processFunction = new ProcessFunction<Long, Integer>(){
            private static final long serialVersionUID = 1L;

            public void processElement(Long value, ProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
            }

            public void onTimer(long timestamp, ProcessFunction.OnTimerContext ctx, Collector<Integer> out) throws Exception {
            }
        };
        SingleOutputStreamOperator processed = src.keyBy(new IdentityKeySelector()).process((ProcessFunction)processFunction);
        processed.addSink((SinkFunction)new DiscardingSink());
        Assert.assertEquals((Object)processFunction, (Object)DataStreamTest.getFunctionForDataStream(processed));
        Assert.assertTrue((boolean)(DataStreamTest.getOperatorForDataStream(processed) instanceof LegacyKeyedProcessOperator));
    }

    @Test
    public void testKeyedStreamKeyedProcessTranslation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.generateSequence(0L, 0L);
        KeyedProcessFunction<Long, Long, Integer> keyedProcessFunction = new KeyedProcessFunction<Long, Long, Integer>(){
            private static final long serialVersionUID = 1L;

            public void processElement(Long value, KeyedProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
            }

            public void onTimer(long timestamp, KeyedProcessFunction.OnTimerContext ctx, Collector<Integer> out) throws Exception {
            }
        };
        SingleOutputStreamOperator processed = src.keyBy(new IdentityKeySelector()).process((KeyedProcessFunction)keyedProcessFunction);
        processed.addSink((SinkFunction)new DiscardingSink());
        Assert.assertEquals((Object)keyedProcessFunction, (Object)DataStreamTest.getFunctionForDataStream(processed));
        Assert.assertTrue((boolean)(DataStreamTest.getOperatorForDataStream(processed) instanceof KeyedProcessOperator));
    }

    @Test
    public void testProcessTranslation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.generateSequence(0L, 0L);
        ProcessFunction<Long, Integer> processFunction = new ProcessFunction<Long, Integer>(){
            private static final long serialVersionUID = 1L;

            public void processElement(Long value, ProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
            }

            public void onTimer(long timestamp, ProcessFunction.OnTimerContext ctx, Collector<Integer> out) throws Exception {
            }
        };
        SingleOutputStreamOperator processed = src.process((ProcessFunction)processFunction);
        processed.addSink((SinkFunction)new DiscardingSink());
        Assert.assertEquals((Object)processFunction, (Object)DataStreamTest.getFunctionForDataStream(processed));
        Assert.assertTrue((boolean)(DataStreamTest.getOperatorForDataStream(processed) instanceof ProcessOperator));
    }

    @Test
    public void testFailedTranslationOnKeyed() {
        MapStateDescriptor descriptor = new MapStateDescriptor("broadcast", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        KeyedStream srcOne = env.generateSequence(0L, 5L).assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new CustomWmEmitter<Long>(){

            public long extractTimestamp(Long element, long previousElementTimestamp) {
                return element;
            }
        }).keyBy((KeySelector & Serializable)value -> value);
        SingleOutputStreamOperator srcTwo = env.fromElements((Object[])new String[]{"Test:0", "Test:1", "Test:2", "Test:3", "Test:4", "Test:5"}).assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new CustomWmEmitter<String>(){

            public long extractTimestamp(String element, long previousElementTimestamp) {
                return Long.parseLong(element.split(":")[1]);
            }
        });
        BroadcastStream broadcast = srcTwo.broadcast(new MapStateDescriptor[]{descriptor});
        BroadcastConnectedStream bcStream = srcOne.connect(broadcast);
        this.expectedException.expect(IllegalArgumentException.class);
        bcStream.process((BroadcastProcessFunction)new BroadcastProcessFunction<Long, String, String>(){

            public void processBroadcastElement(String value, BroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            }

            public void processElement(Long value, BroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            }
        });
    }

    @Test
    public void testFailedTranslationOnNonKeyed() {
        MapStateDescriptor descriptor = new MapStateDescriptor("broadcast", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator srcOne = env.generateSequence(0L, 5L).assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new CustomWmEmitter<Long>(){

            public long extractTimestamp(Long element, long previousElementTimestamp) {
                return element;
            }
        });
        SingleOutputStreamOperator srcTwo = env.fromElements((Object[])new String[]{"Test:0", "Test:1", "Test:2", "Test:3", "Test:4", "Test:5"}).assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new CustomWmEmitter<String>(){

            public long extractTimestamp(String element, long previousElementTimestamp) {
                return Long.parseLong(element.split(":")[1]);
            }
        });
        BroadcastStream broadcast = srcTwo.broadcast(new MapStateDescriptor[]{descriptor});
        BroadcastConnectedStream bcStream = srcOne.connect(broadcast);
        this.expectedException.expect(IllegalArgumentException.class);
        bcStream.process((KeyedBroadcastProcessFunction)new KeyedBroadcastProcessFunction<String, Long, String, String>(){

            public void processBroadcastElement(String value, KeyedBroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            }

            public void processElement(Long value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            }
        });
    }

    @Test
    public void operatorTest() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.generateSequence(0L, 0L);
        MapFunction<Long, Integer> mapFunction = new MapFunction<Long, Integer>(){

            public Integer map(Long value) throws Exception {
                return null;
            }
        };
        SingleOutputStreamOperator map = src.map((MapFunction)mapFunction);
        map.addSink((SinkFunction)new DiscardingSink());
        Assert.assertEquals((Object)mapFunction, (Object)DataStreamTest.getFunctionForDataStream(map));
        FlatMapFunction<Long, Integer> flatMapFunction = new FlatMapFunction<Long, Integer>(){
            private static final long serialVersionUID = 1L;

            public void flatMap(Long value, Collector<Integer> out) throws Exception {
            }
        };
        SingleOutputStreamOperator flatMap = src.flatMap((FlatMapFunction)flatMapFunction);
        flatMap.addSink((SinkFunction)new DiscardingSink());
        Assert.assertEquals((Object)flatMapFunction, (Object)DataStreamTest.getFunctionForDataStream(flatMap));
        FilterFunction<Integer> filterFunction = new FilterFunction<Integer>(){

            public boolean filter(Integer value) throws Exception {
                return false;
            }
        };
        SingleOutputStreamOperator unionFilter = map.union(new DataStream[]{flatMap}).filter((FilterFunction)filterFunction);
        unionFilter.addSink((SinkFunction)new DiscardingSink());
        Assert.assertEquals((Object)filterFunction, (Object)DataStreamTest.getFunctionForDataStream(unionFilter));
        try {
            env.getStreamGraph().getStreamEdges(map.getId(), unionFilter.getId());
        }
        catch (RuntimeException e) {
            Assert.fail((String)e.getMessage());
        }
        try {
            env.getStreamGraph().getStreamEdges(flatMap.getId(), unionFilter.getId());
        }
        catch (RuntimeException e) {
            Assert.fail((String)e.getMessage());
        }
        DummyOutputSelector outputSelector = new DummyOutputSelector();
        SplitStream split = unionFilter.split(outputSelector);
        split.select(new String[]{"dummy"}).addSink((SinkFunction)new DiscardingSink());
        List outputSelectors = env.getStreamGraph().getStreamNode(Integer.valueOf(unionFilter.getId())).getOutputSelectors();
        Assert.assertEquals((long)1L, (long)outputSelectors.size());
        Assert.assertEquals(outputSelector, outputSelectors.get(0));
        DataStream select = split.select(new String[]{"a"});
        DataStreamSink sink = select.print();
        StreamEdge splitEdge = (StreamEdge)env.getStreamGraph().getStreamEdges(unionFilter.getId(), sink.getTransformation().getId()).get(0);
        Assert.assertEquals((Object)"a", splitEdge.getSelectedNames().get(0));
        DataStreamSink sinkWithIdentifier = select.print("identifier");
        StreamEdge newSplitEdge = (StreamEdge)env.getStreamGraph().getStreamEdges(unionFilter.getId(), sinkWithIdentifier.getTransformation().getId()).get(0);
        Assert.assertEquals((Object)"a", newSplitEdge.getSelectedNames().get(0));
        ConnectedStreams connect = map.connect((DataStream)flatMap);
        CoMapFunction<Integer, Integer, String> coMapper = new CoMapFunction<Integer, Integer, String>(){
            private static final long serialVersionUID = 1L;

            public String map1(Integer value) {
                return null;
            }

            public String map2(Integer value) {
                return null;
            }
        };
        SingleOutputStreamOperator coMap = connect.map((CoMapFunction)coMapper);
        coMap.addSink((SinkFunction)new DiscardingSink());
        Assert.assertEquals((Object)coMapper, (Object)DataStreamTest.getFunctionForDataStream(coMap));
        try {
            env.getStreamGraph().getStreamEdges(map.getId(), coMap.getId());
        }
        catch (RuntimeException e) {
            Assert.fail((String)e.getMessage());
        }
        try {
            env.getStreamGraph().getStreamEdges(flatMap.getId(), coMap.getId());
        }
        catch (RuntimeException e) {
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void sinkKeyTest() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSink sink = env.generateSequence(1L, 100L).print();
        Assert.assertTrue((env.getStreamGraph().getStreamNode(Integer.valueOf(sink.getTransformation().getId())).getStatePartitioner1() == null ? 1 : 0) != 0);
        Assert.assertTrue((boolean)(((StreamEdge)env.getStreamGraph().getStreamNode(Integer.valueOf(sink.getTransformation().getId())).getInEdges().get(0)).getPartitioner() instanceof ForwardPartitioner));
        KeySelector<Long, Long> key1 = new KeySelector<Long, Long>(){
            private static final long serialVersionUID = 1L;

            public Long getKey(Long value) throws Exception {
                return 0L;
            }
        };
        DataStreamSink sink2 = env.generateSequence(1L, 100L).keyBy((KeySelector)key1).print();
        Assert.assertNotNull((Object)env.getStreamGraph().getStreamNode(Integer.valueOf(sink2.getTransformation().getId())).getStatePartitioner1());
        Assert.assertNotNull((Object)env.getStreamGraph().getStreamNode(Integer.valueOf(sink2.getTransformation().getId())).getStateKeySerializer());
        Assert.assertNotNull((Object)env.getStreamGraph().getStreamNode(Integer.valueOf(sink2.getTransformation().getId())).getStateKeySerializer());
        Assert.assertEquals((Object)key1, (Object)env.getStreamGraph().getStreamNode(Integer.valueOf(sink2.getTransformation().getId())).getStatePartitioner1());
        Assert.assertTrue((boolean)(((StreamEdge)env.getStreamGraph().getStreamNode(Integer.valueOf(sink2.getTransformation().getId())).getInEdges().get(0)).getPartitioner() instanceof KeyGroupStreamPartitioner));
        KeySelector<Long, Long> key2 = new KeySelector<Long, Long>(){
            private static final long serialVersionUID = 1L;

            public Long getKey(Long value) throws Exception {
                return 0L;
            }
        };
        DataStreamSink sink3 = env.generateSequence(1L, 100L).keyBy((KeySelector)key2).print();
        Assert.assertTrue((env.getStreamGraph().getStreamNode(Integer.valueOf(sink3.getTransformation().getId())).getStatePartitioner1() != null ? 1 : 0) != 0);
        Assert.assertEquals((Object)key2, (Object)env.getStreamGraph().getStreamNode(Integer.valueOf(sink3.getTransformation().getId())).getStatePartitioner1());
        Assert.assertTrue((boolean)(((StreamEdge)env.getStreamGraph().getStreamNode(Integer.valueOf(sink3.getTransformation().getId())).getInEdges().get(0)).getPartitioner() instanceof KeyGroupStreamPartitioner));
    }

    @Test
    public void testChannelSelectors() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.generateSequence(0L, 0L);
        DataStream broadcast = src.broadcast();
        DataStreamSink broadcastSink = broadcast.print();
        StreamPartitioner broadcastPartitioner = ((StreamEdge)env.getStreamGraph().getStreamEdges(src.getId(), broadcastSink.getTransformation().getId()).get(0)).getPartitioner();
        Assert.assertTrue((boolean)(broadcastPartitioner instanceof BroadcastPartitioner));
        DataStream shuffle = src.shuffle();
        DataStreamSink shuffleSink = shuffle.print();
        StreamPartitioner shufflePartitioner = ((StreamEdge)env.getStreamGraph().getStreamEdges(src.getId(), shuffleSink.getTransformation().getId()).get(0)).getPartitioner();
        Assert.assertTrue((boolean)(shufflePartitioner instanceof ShufflePartitioner));
        DataStream forward = src.forward();
        DataStreamSink forwardSink = forward.print();
        StreamPartitioner forwardPartitioner = ((StreamEdge)env.getStreamGraph().getStreamEdges(src.getId(), forwardSink.getTransformation().getId()).get(0)).getPartitioner();
        Assert.assertTrue((boolean)(forwardPartitioner instanceof ForwardPartitioner));
        DataStream rebalance = src.rebalance();
        DataStreamSink rebalanceSink = rebalance.print();
        StreamPartitioner rebalancePartitioner = ((StreamEdge)env.getStreamGraph().getStreamEdges(src.getId(), rebalanceSink.getTransformation().getId()).get(0)).getPartitioner();
        Assert.assertTrue((boolean)(rebalancePartitioner instanceof RebalancePartitioner));
        DataStream global = src.global();
        DataStreamSink globalSink = global.print();
        StreamPartitioner globalPartitioner = ((StreamEdge)env.getStreamGraph().getStreamEdges(src.getId(), globalSink.getTransformation().getId()).get(0)).getPartitioner();
        Assert.assertTrue((boolean)(globalPartitioner instanceof GlobalPartitioner));
    }

    @Test
    public void testConsecutiveSplitRejection() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromElements((Object[])new Integer[]{0, 0});
        DummyOutputSelector outputSelector = new DummyOutputSelector();
        src.split(outputSelector).split(outputSelector).addSink((SinkFunction)new DiscardingSink());
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
        env.getStreamGraph();
    }

    @Test
    public void testSplitAfterSideOutputRejection() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromElements((Object[])new Integer[]{0, 0});
        OutputTag<Integer> outputTag = new OutputTag<Integer>("dummy"){};
        DummyOutputSelector outputSelector = new DummyOutputSelector();
        src.getSideOutput((OutputTag)outputTag).split(outputSelector).addSink((SinkFunction)new DiscardingSink());
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("Split after side-outputs are not supported. Splits are deprecated. Please use side-outputs.");
        env.getStreamGraph();
    }

    @Test
    public void testSelectBetweenConsecutiveSplitRejection() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromElements((Object[])new Integer[]{0, 0});
        DummyOutputSelector outputSelector = new DummyOutputSelector();
        src.split(outputSelector).select(new String[]{"dummy"}).split(outputSelector).addSink((SinkFunction)new DiscardingSink());
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
        env.getStreamGraph();
    }

    @Test
    public void testUnionBetweenConsecutiveSplitRejection() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromElements((Object[])new Integer[]{0, 0});
        DummyOutputSelector outputSelector = new DummyOutputSelector();
        src.split(outputSelector).select(new String[]{"dummy"}).union(new DataStream[]{src.map((MapFunction & Serializable)x -> x)}).split(outputSelector).addSink((SinkFunction)new DiscardingSink());
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
        env.getStreamGraph();
    }

    @Test
    public void testKeybyBetweenConsecutiveSplitRejection() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromElements((Object[])new Integer[]{0, 0});
        DummyOutputSelector outputSelector = new DummyOutputSelector();
        src.split(outputSelector).select(new String[]{"dummy"}).keyBy((KeySelector & Serializable)x -> x).split(outputSelector).addSink((SinkFunction)new DiscardingSink());
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.");
        env.getStreamGraph();
    }

    @Test
    public void testPrimitiveArrayKeyRejection() {
        KeySelector<Tuple2<Integer[], String>, int[]> keySelector = new KeySelector<Tuple2<Integer[], String>, int[]>(){

            public int[] getKey(Tuple2<Integer[], String> value) throws Exception {
                int[] ks = new int[((Integer[])value.f0).length];
                for (int i = 0; i < ks.length; ++i) {
                    ks[i] = ((Integer[])value.f0)[i];
                }
                return ks;
            }
        };
        this.testKeyRejection((KeySelector)keySelector, (TypeInformation)PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
    }

    @Test
    public void testBasicArrayKeyRejection() {
        KeySelector<Tuple2<Integer[], String>, Integer[]> keySelector = new KeySelector<Tuple2<Integer[], String>, Integer[]>(){

            public Integer[] getKey(Tuple2<Integer[], String> value) throws Exception {
                return (Integer[])value.f0;
            }
        };
        this.testKeyRejection((KeySelector)keySelector, (TypeInformation)BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
    }

    @Test
    public void testObjectArrayKeyRejection() {
        KeySelector<Tuple2<Integer[], String>, Object[]> keySelector = new KeySelector<Tuple2<Integer[], String>, Object[]>(){

            public Object[] getKey(Tuple2<Integer[], String> value) throws Exception {
                Object[] ks = new Object[((Integer[])value.f0).length];
                for (int i = 0; i < ks.length; ++i) {
                    ks[i] = new Object();
                }
                return ks;
            }
        };
        ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor(Object[].class, (TypeInformation)new GenericTypeInfo(Object.class));
        this.testKeyRejection((KeySelector)keySelector, (TypeInformation)keyTypeInfo);
    }

    private <K> void testKeyRejection(KeySelector<Tuple2<Integer[], String>, K> keySelector, TypeInformation<K> expectedKeyType) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)new Integer[]{1, 2}, (Object)"barfoo")});
        Assert.assertEquals(expectedKeyType, (Object)TypeExtractor.getKeySelectorTypes(keySelector, (TypeInformation)input.getType()));
        this.expectedException.expect(InvalidProgramException.class);
        this.expectedException.expectMessage((Matcher)new StringStartsWith("Type " + expectedKeyType + " cannot be used as key."));
        input.keyBy(keySelector);
    }

    @Test
    public void testPOJOWithNestedArrayNoHashCodeKeyRejection() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.fromElements((Object[])new POJOWithHashCode[]{new POJOWithHashCode(new int[]{1, 2})});
        TupleTypeInfo expectedTypeInfo = new TupleTypeInfo(new TypeInformation[]{PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO});
        this.expectedException.expect(InvalidProgramException.class);
        this.expectedException.expectMessage((Matcher)new StringStartsWith("Type " + expectedTypeInfo + " cannot be used as key."));
        input.keyBy(new String[]{"id"});
    }

    @Test
    public void testPOJOWithNestedArrayAndHashCodeWorkAround() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.fromElements((Object[])new POJOWithHashCode[]{new POJOWithHashCode(new int[]{1, 2})});
        input.keyBy((KeySelector)new KeySelector<POJOWithHashCode, POJOWithHashCode>(){

            public POJOWithHashCode getKey(POJOWithHashCode value) throws Exception {
                return value;
            }
        }).addSink((SinkFunction)new SinkFunction<POJOWithHashCode>(){

            public void invoke(POJOWithHashCode value) throws Exception {
                Assert.assertEquals((Object)value.getId(), (Object)new int[]{1, 2});
            }
        });
    }

    @Test
    public void testPOJOnoHashCodeKeyRejection() {
        KeySelector<POJOWithoutHashCode, POJOWithoutHashCode> keySelector = new KeySelector<POJOWithoutHashCode, POJOWithoutHashCode>(){

            public POJOWithoutHashCode getKey(POJOWithoutHashCode value) throws Exception {
                return value;
            }
        };
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.fromElements((Object[])new POJOWithoutHashCode[]{new POJOWithoutHashCode(new int[]{1, 2})});
        this.expectedException.expect(InvalidProgramException.class);
        input.keyBy((KeySelector)keySelector);
    }

    @Test
    public void testTupleNestedArrayKeyRejection() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)new Integer[]{1, 2}, (Object)"test-test")});
        TupleTypeInfo expectedTypeInfo = new TupleTypeInfo(new TypeInformation[]{BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO});
        this.expectedException.expect(InvalidProgramException.class);
        this.expectedException.expectMessage((Matcher)new StringStartsWith("Type " + expectedTypeInfo + " cannot be used as key."));
        input.keyBy((KeySelector)new KeySelector<Tuple2<Integer[], String>, Tuple2<Integer[], String>>(){

            public Tuple2<Integer[], String> getKey(Tuple2<Integer[], String> value) throws Exception {
                return value;
            }
        });
    }

    @Test
    public void testPrimitiveKeyAcceptance() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setMaxParallelism(1);
        DataStreamSource input = env.fromElements((Object[])new Integer[]{new Integer(10000)});
        KeyedStream keyedStream = input.keyBy((KeySelector)new KeySelector<Integer, Object>(){

            public Object getKey(Integer value) throws Exception {
                return value;
            }
        });
        keyedStream.addSink((SinkFunction)new SinkFunction<Integer>(){

            public void invoke(Integer value) throws Exception {
                Assert.assertEquals((long)10000L, (long)value.intValue());
            }
        });
    }

    private static StreamOperator<?> getOperatorForDataStream(DataStream<?> dataStream) {
        StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
        StreamGraph streamGraph = env.getStreamGraph();
        return streamGraph.getStreamNode(Integer.valueOf(dataStream.getId())).getOperator();
    }

    private static Function getFunctionForDataStream(DataStream<?> dataStream) {
        AbstractUdfStreamOperator operator = (AbstractUdfStreamOperator)DataStreamTest.getOperatorForDataStream(dataStream);
        return operator.getUserFunction();
    }

    private static Integer createDownStreamId(DataStream<?> dataStream) {
        return dataStream.print().getTransformation().getId();
    }

    private static boolean isKeyed(DataStream<?> dataStream) {
        return dataStream instanceof KeyedStream;
    }

    private static Integer createDownStreamId(ConnectedStreams dataStream) {
        SingleOutputStreamOperator coMap = dataStream.map((CoMapFunction)new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>(){
            private static final long serialVersionUID = 1L;

            public Object map1(Tuple2<Long, Long> value) {
                return null;
            }

            public Object map2(Tuple2<Long, Long> value) {
                return null;
            }
        });
        coMap.addSink((SinkFunction)new DiscardingSink());
        return coMap.getId();
    }

    private static boolean isKeyed(ConnectedStreams<?, ?> dataStream) {
        return dataStream.getFirstInput() instanceof KeyedStream && dataStream.getSecondInput() instanceof KeyedStream;
    }

    private static boolean isPartitioned(List<StreamEdge> edges) {
        boolean result = true;
        for (StreamEdge edge : edges) {
            if (edge.getPartitioner() instanceof KeyGroupStreamPartitioner) continue;
            result = false;
        }
        return result;
    }

    private static boolean isCustomPartitioned(List<StreamEdge> edges) {
        boolean result = true;
        for (StreamEdge edge : edges) {
            if (edge.getPartitioner() instanceof CustomPartitionerWrapper) continue;
            result = false;
        }
        return result;
    }

    private class DummyOutputSelector<Integer>
    implements OutputSelector<Integer> {
        private DummyOutputSelector() {
        }

        public Iterable<String> select(Integer value) {
            return null;
        }
    }

    private static class CustomPOJO {
        private String s;
        private int i;

        public void setS(String s) {
            this.s = s;
        }

        public void setI(int i) {
            this.i = i;
        }

        public String getS() {
            return this.s;
        }

        public int getI() {
            return this.i;
        }
    }

    private static class IdentityKeySelector<T>
    implements KeySelector<T, T> {
        private static final long serialVersionUID = 1L;

        private IdentityKeySelector() {
        }

        public T getKey(T value) throws Exception {
            return value;
        }
    }

    private static class FirstSelector
    implements KeySelector<Tuple2<Long, Long>, Long> {
        private static final long serialVersionUID = 1L;

        private FirstSelector() {
        }

        public Long getKey(Tuple2<Long, Long> value) throws Exception {
            return (Long)value.f0;
        }
    }

    public static class POJOWithHashCode
    extends POJOWithoutHashCode {
        public POJOWithHashCode() {
        }

        public POJOWithHashCode(int[] id) {
            super(id);
        }

        public int hashCode() {
            int hash = 31;
            for (int i : this.getId()) {
                hash = 37 * hash + i;
            }
            return hash;
        }
    }

    public static class POJOWithoutHashCode {
        private int[] id;

        public POJOWithoutHashCode() {
        }

        public POJOWithoutHashCode(int[] id) {
            this.id = id;
        }

        public int[] getId() {
            return this.id;
        }

        public void setId(int[] id) {
            this.id = id;
        }
    }

    private static abstract class CustomWmEmitter<T>
    implements AssignerWithPunctuatedWatermarks<T> {
        private CustomWmEmitter() {
        }

        @Nullable
        public Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp) {
            return new Watermark(extractedTimestamp);
        }
    }
}

