/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.io.TypeSerializerInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class StreamingJobGraphGeneratorTest
extends TestLogger {
    @Test
    public void testParallelismOneNotChained() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator input = env.fromElements((Object[])new String[]{"a", "b", "c", "d", "e", "f"}).map((MapFunction)new MapFunction<String, Tuple2<String, String>>(){

            public Tuple2<String, String> map(String value) {
                return new Tuple2((Object)value, (Object)value);
            }
        });
        SingleOutputStreamOperator result = input.keyBy(new int[]{0}).map((MapFunction)new MapFunction<Tuple2<String, String>, Tuple2<String, String>>(){

            public Tuple2<String, String> map(Tuple2<String, String> value) {
                return value;
            }
        });
        result.addSink((SinkFunction)new SinkFunction<Tuple2<String, String>>(){

            public void invoke(Tuple2<String, String> value) {
            }
        });
        StreamGraph streamGraph = env.getStreamGraph("test job");
        JobGraph jobGraph = streamGraph.getJobGraph();
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assert.assertEquals((long)2L, (long)jobGraph.getNumberOfVertices());
        Assert.assertEquals((long)1L, (long)((JobVertex)verticesSorted.get(0)).getParallelism());
        Assert.assertEquals((long)1L, (long)((JobVertex)verticesSorted.get(1)).getParallelism());
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex mapSinkVertex = (JobVertex)verticesSorted.get(1);
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((JobEdge)mapSinkVertex.getInputs().get(0)).getSource().getResultType());
    }

    @Test
    public void testDisabledCheckpointing() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamGraph streamGraph = new StreamGraph(env.getConfig(), env.getCheckpointConfig());
        Assert.assertFalse((String)"Checkpointing enabled", (boolean)streamGraph.getCheckpointConfig().isCheckpointingEnabled());
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        JobCheckpointingSettings snapshottingSettings = jobGraph.getCheckpointingSettings();
        Assert.assertEquals((long)Long.MAX_VALUE, (long)snapshottingSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval());
    }

    @Test
    public void testChainStartEndSetting() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.fromElements((Object[])new Integer[]{1, 2, 3}).map((MapFunction)new MapFunction<Integer, Integer>(){

            public Integer map(Integer value) throws Exception {
                return value;
            }
        }).print();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex mapPrintVertex = (JobVertex)verticesSorted.get(1);
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((JobEdge)mapPrintVertex.getInputs().get(0)).getSource().getResultType());
        StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration());
        StreamConfig mapConfig = new StreamConfig(mapPrintVertex.getConfiguration());
        Map chainedConfigs = mapConfig.getTransitiveChainedTaskConfigs(((Object)((Object)this)).getClass().getClassLoader());
        StreamConfig printConfig = (StreamConfig)chainedConfigs.values().iterator().next();
        Assert.assertTrue((boolean)sourceConfig.isChainStart());
        Assert.assertTrue((boolean)sourceConfig.isChainEnd());
        Assert.assertTrue((boolean)mapConfig.isChainStart());
        Assert.assertFalse((boolean)mapConfig.isChainEnd());
        Assert.assertFalse((boolean)printConfig.isChainStart());
        Assert.assertTrue((boolean)printConfig.isChainEnd());
    }

    @Test
    public void testResourcesForChainedSourceSink() throws Exception {
        ResourceSpec resource1 = ResourceSpec.newBuilder().setCpuCores(0.1).setHeapMemoryInMB(100).build();
        ResourceSpec resource2 = ResourceSpec.newBuilder().setCpuCores(0.2).setHeapMemoryInMB(200).build();
        ResourceSpec resource3 = ResourceSpec.newBuilder().setCpuCores(0.3).setHeapMemoryInMB(300).build();
        ResourceSpec resource4 = ResourceSpec.newBuilder().setCpuCores(0.4).setHeapMemoryInMB(400).build();
        ResourceSpec resource5 = ResourceSpec.newBuilder().setCpuCores(0.5).setHeapMemoryInMB(500).build();
        Method opMethod = SingleOutputStreamOperator.class.getDeclaredMethod("setResources", ResourceSpec.class);
        opMethod.setAccessible(true);
        Method sinkMethod = DataStreamSink.class.getDeclaredMethod("setResources", ResourceSpec.class);
        sinkMethod.setAccessible(true);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.addSource((SourceFunction)new ParallelSourceFunction<Tuple2<Integer, Integer>>(){

            public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
            }

            public void cancel() {
            }
        });
        opMethod.invoke((Object)source, resource1);
        SingleOutputStreamOperator map = source.map((MapFunction)new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>(){

            public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
                return value;
            }
        });
        opMethod.invoke((Object)map, resource2);
        SingleOutputStreamOperator filter = map.filter((FilterFunction)new FilterFunction<Tuple2<Integer, Integer>>(){

            public boolean filter(Tuple2<Integer, Integer> value) throws Exception {
                return false;
            }
        });
        opMethod.invoke((Object)filter, resource3);
        SingleOutputStreamOperator reduce = filter.keyBy(new int[]{0}).reduce((ReduceFunction)new ReduceFunction<Tuple2<Integer, Integer>>(){

            public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
                return new Tuple2(value1.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1));
            }
        });
        opMethod.invoke((Object)reduce, resource4);
        DataStreamSink sink = reduce.addSink((SinkFunction)new SinkFunction<Tuple2<Integer, Integer>>(){

            public void invoke(Tuple2<Integer, Integer> value) throws Exception {
            }
        });
        sinkMethod.invoke((Object)sink, resource5);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        JobVertex sourceMapFilterVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
        JobVertex reduceSinkVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        Assert.assertTrue((boolean)sourceMapFilterVertex.getMinResources().equals((Object)resource1.merge(resource2).merge(resource3)));
        Assert.assertTrue((boolean)reduceSinkVertex.getPreferredResources().equals((Object)resource4.merge(resource5)));
    }

    @Test
    public void testResourcesForIteration() throws Exception {
        ResourceSpec resource1 = ResourceSpec.newBuilder().setCpuCores(0.1).setHeapMemoryInMB(100).build();
        ResourceSpec resource2 = ResourceSpec.newBuilder().setCpuCores(0.2).setHeapMemoryInMB(200).build();
        ResourceSpec resource3 = ResourceSpec.newBuilder().setCpuCores(0.3).setHeapMemoryInMB(300).build();
        ResourceSpec resource4 = ResourceSpec.newBuilder().setCpuCores(0.4).setHeapMemoryInMB(400).build();
        ResourceSpec resource5 = ResourceSpec.newBuilder().setCpuCores(0.5).setHeapMemoryInMB(500).build();
        Method opMethod = SingleOutputStreamOperator.class.getDeclaredMethod("setResources", ResourceSpec.class);
        opMethod.setAccessible(true);
        Method sinkMethod = DataStreamSink.class.getDeclaredMethod("setResources", ResourceSpec.class);
        sinkMethod.setAccessible(true);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.addSource((SourceFunction)new ParallelSourceFunction<Integer>(){

            public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            }

            public void cancel() {
            }
        }).name("test_source");
        opMethod.invoke((Object)source, resource1);
        IterativeStream iteration = source.iterate(3000L);
        opMethod.invoke((Object)iteration, resource2);
        SingleOutputStreamOperator flatMap = iteration.flatMap((FlatMapFunction)new FlatMapFunction<Integer, Integer>(){

            public void flatMap(Integer value, Collector<Integer> out) throws Exception {
                out.collect((Object)value);
            }
        }).name("test_flatMap");
        opMethod.invoke((Object)flatMap, resource3);
        SingleOutputStreamOperator increment = flatMap.filter((FilterFunction)new FilterFunction<Integer>(){

            public boolean filter(Integer value) throws Exception {
                return false;
            }
        }).name("test_filter");
        opMethod.invoke((Object)increment, resource4);
        DataStreamSink sink = iteration.closeWith((DataStream)increment).addSink((SinkFunction)new SinkFunction<Integer>(){

            public void invoke(Integer value) throws Exception {
            }
        }).disableChaining().name("test_sink");
        sinkMethod.invoke((Object)sink, resource5);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            if (jobVertex.getName().contains("test_source")) {
                Assert.assertTrue((boolean)jobVertex.getMinResources().equals((Object)resource1));
                continue;
            }
            if (jobVertex.getName().contains("Iteration_Source")) {
                Assert.assertTrue((boolean)jobVertex.getPreferredResources().equals((Object)resource2));
                continue;
            }
            if (jobVertex.getName().contains("test_flatMap")) {
                Assert.assertTrue((boolean)jobVertex.getMinResources().equals((Object)resource3.merge(resource4)));
                continue;
            }
            if (jobVertex.getName().contains("Iteration_Tail")) {
                Assert.assertTrue((boolean)jobVertex.getPreferredResources().equals((Object)ResourceSpec.DEFAULT));
                continue;
            }
            if (!jobVertex.getName().contains("test_sink")) continue;
            Assert.assertTrue((boolean)jobVertex.getMinResources().equals((Object)resource5));
        }
    }

    @Test
    public void testInputOutputFormat() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.addSource((SourceFunction)new InputFormatSourceFunction((InputFormat)new TypeSerializerInputFormat(TypeInformation.of(Long.class)), TypeInformation.of(Long.class)), TypeInformation.of(Long.class)).name("source");
        source.writeUsingOutputFormat((OutputFormat)new DiscardingOutputFormat()).name("sink1");
        source.writeUsingOutputFormat((OutputFormat)new DiscardingOutputFormat()).name("sink2");
        StreamGraph streamGraph = env.getStreamGraph();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        Assert.assertEquals((long)1L, (long)jobGraph.getNumberOfVertices());
        JobVertex jobVertex = (JobVertex)jobGraph.getVertices().iterator().next();
        Assert.assertTrue((boolean)(jobVertex instanceof InputOutputFormatVertex));
        InputOutputFormatContainer formatContainer = new InputOutputFormatContainer(new TaskConfig(jobVertex.getConfiguration()), Thread.currentThread().getContextClassLoader());
        Map inputFormats = formatContainer.getInputFormats();
        Map outputFormats = formatContainer.getOutputFormats();
        Assert.assertEquals((long)1L, (long)inputFormats.size());
        Assert.assertEquals((long)2L, (long)outputFormats.size());
        HashMap<String, OperatorID> nameToOperatorIds = new HashMap<String, OperatorID>();
        StreamConfig headConfig = new StreamConfig(jobVertex.getConfiguration());
        nameToOperatorIds.put(headConfig.getOperatorName(), headConfig.getOperatorID());
        Map chainedConfigs = headConfig.getTransitiveChainedTaskConfigs(Thread.currentThread().getContextClassLoader());
        for (StreamConfig config : chainedConfigs.values()) {
            nameToOperatorIds.put(config.getOperatorName(), config.getOperatorID());
        }
        InputFormat sourceFormat = (InputFormat)((UserCodeWrapper)inputFormats.get(nameToOperatorIds.get("Source: source"))).getUserCodeObject();
        Assert.assertTrue((boolean)(sourceFormat instanceof TypeSerializerInputFormat));
        OutputFormat sinkFormat1 = (OutputFormat)((UserCodeWrapper)outputFormats.get(nameToOperatorIds.get("Sink: sink1"))).getUserCodeObject();
        Assert.assertTrue((boolean)(sinkFormat1 instanceof DiscardingOutputFormat));
        OutputFormat sinkFormat2 = (OutputFormat)((UserCodeWrapper)outputFormats.get(nameToOperatorIds.get("Sink: sink2"))).getUserCodeObject();
        Assert.assertTrue((boolean)(sinkFormat2 instanceof DiscardingOutputFormat));
    }

    @Test
    public void testShuffleModePipelined() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource sourceDataStream = env.fromElements((Object[])new Integer[]{1, 2, 3});
        DataStream partitionAfterSourceDataStream = new DataStream(env, (Transformation)new PartitionTransformation(sourceDataStream.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), ShuffleMode.PIPELINED));
        SingleOutputStreamOperator mapDataStream = partitionAfterSourceDataStream.map((MapFunction & Serializable)value -> value).setParallelism(1);
        DataStream partitionAfterMapDataStream = new DataStream(env, (Transformation)new PartitionTransformation(mapDataStream.getTransformation(), (StreamPartitioner)new RescalePartitioner(), ShuffleMode.PIPELINED));
        partitionAfterMapDataStream.print().setParallelism(2);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assert.assertEquals((long)2L, (long)verticesSorted.size());
        JobVertex sourceAndMapVertex = (JobVertex)verticesSorted.get(0);
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)sourceAndMapVertex.getProducedDataSets().get(0)).getResultType());
    }

    @Test
    public void testShuffleModeBatch() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource sourceDataStream = env.fromElements((Object[])new Integer[]{1, 2, 3});
        DataStream partitionAfterSourceDataStream = new DataStream(env, (Transformation)new PartitionTransformation(sourceDataStream.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), ShuffleMode.BATCH));
        SingleOutputStreamOperator mapDataStream = partitionAfterSourceDataStream.map((MapFunction & Serializable)value -> value).setParallelism(1);
        DataStream partitionAfterMapDataStream = new DataStream(env, (Transformation)new PartitionTransformation(mapDataStream.getTransformation(), (StreamPartitioner)new RescalePartitioner(), ShuffleMode.BATCH));
        partitionAfterMapDataStream.print().setParallelism(2);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assert.assertEquals((long)3L, (long)verticesSorted.size());
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex mapVertex = (JobVertex)verticesSorted.get(1);
        Assert.assertEquals((Object)ResultPartitionType.BLOCKING, (Object)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.BLOCKING, (Object)((IntermediateDataSet)mapVertex.getProducedDataSets().get(0)).getResultType());
    }

    @Test
    public void testShuffleModeUndefined() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource sourceDataStream = env.fromElements((Object[])new Integer[]{1, 2, 3});
        DataStream partitionAfterSourceDataStream = new DataStream(env, (Transformation)new PartitionTransformation(sourceDataStream.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), ShuffleMode.UNDEFINED));
        SingleOutputStreamOperator mapDataStream = partitionAfterSourceDataStream.map((MapFunction & Serializable)value -> value).setParallelism(1);
        DataStream partitionAfterMapDataStream = new DataStream(env, (Transformation)new PartitionTransformation(mapDataStream.getTransformation(), (StreamPartitioner)new RescalePartitioner(), ShuffleMode.UNDEFINED));
        partitionAfterMapDataStream.print().setParallelism(2);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assert.assertEquals((long)2L, (long)verticesSorted.size());
        JobVertex sourceAndMapVertex = (JobVertex)verticesSorted.get(0);
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)sourceAndMapVertex.getProducedDataSets().get(0)).getResultType());
    }

    @Test
    public void testIteration() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.fromElements((Object[])new Integer[]{1, 2, 3}).name("source");
        IterativeStream iteration = source.iterate(3000L);
        iteration.name("iteration").setParallelism(2);
        SingleOutputStreamOperator map = iteration.map((MapFunction & Serializable)x -> x + 1).name("map").setParallelism(2);
        SingleOutputStreamOperator filter = map.filter((FilterFunction & Serializable)x -> false).name("filter").setParallelism(2);
        iteration.closeWith((DataStream)filter).print();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        SlotSharingGroup slotSharingGroup = jobGraph.getVerticesAsArray()[0].getSlotSharingGroup();
        Assert.assertNotNull((Object)slotSharingGroup);
        CoLocationGroup iterationSourceCoLocationGroup = null;
        CoLocationGroup iterationSinkCoLocationGroup = null;
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            Assert.assertEquals((Object)slotSharingGroup, (Object)jobVertex.getSlotSharingGroup());
            if (jobVertex.getName().startsWith("IterationSource")) {
                iterationSourceCoLocationGroup = jobVertex.getCoLocationGroup();
                Assert.assertTrue((boolean)iterationSourceCoLocationGroup.getVertices().contains(jobVertex));
                continue;
            }
            if (jobVertex.getName().startsWith("IterationSink")) {
                iterationSinkCoLocationGroup = jobVertex.getCoLocationGroup();
                Assert.assertTrue((boolean)iterationSinkCoLocationGroup.getVertices().contains(jobVertex));
                continue;
            }
            Assert.assertNull((Object)jobVertex.getCoLocationGroup());
        }
        Assert.assertNotNull(iterationSourceCoLocationGroup);
        Assert.assertNotNull(iterationSinkCoLocationGroup);
        Assert.assertEquals(iterationSourceCoLocationGroup, iterationSinkCoLocationGroup);
    }

    @Test
    public void testDisableSlotSharingForIteration() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.fromElements((Object[])new Integer[]{1, 2, 3}).name("source");
        IterativeStream iteration = source.iterate(3000L);
        iteration.name("iteration").setParallelism(2);
        SingleOutputStreamOperator map = iteration.map((MapFunction & Serializable)x -> x + 1).name("map").setParallelism(2);
        SingleOutputStreamOperator filter = map.filter((FilterFunction & Serializable)x -> false).name("filter").setParallelism(2);
        iteration.closeWith((DataStream)filter).print();
        ArrayList<Transformation> transformations = new ArrayList<Transformation>();
        transformations.add(source.getTransformation());
        transformations.add(iteration.getTransformation());
        transformations.add(map.getTransformation());
        transformations.add(filter.getTransformation());
        StreamGraphGenerator generator = new StreamGraphGenerator(transformations, env.getConfig(), env.getCheckpointConfig());
        generator.setSlotSharingEnabled(false);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)generator.generate());
        SlotSharingGroup iterationSourceSlotSharingGroup = null;
        SlotSharingGroup iterationSinkSlotSharingGroup = null;
        CoLocationGroup iterationSourceCoLocationGroup = null;
        CoLocationGroup iterationSinkCoLocationGroup = null;
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            if (jobVertex.getName().startsWith("IterationSource")) {
                iterationSourceSlotSharingGroup = jobVertex.getSlotSharingGroup();
                iterationSourceCoLocationGroup = jobVertex.getCoLocationGroup();
                continue;
            }
            if (jobVertex.getName().startsWith("IterationSink")) {
                iterationSinkSlotSharingGroup = jobVertex.getSlotSharingGroup();
                iterationSinkCoLocationGroup = jobVertex.getCoLocationGroup();
                continue;
            }
            Assert.assertNull((Object)jobVertex.getSlotSharingGroup());
        }
        Assert.assertNotNull(iterationSourceSlotSharingGroup);
        Assert.assertNotNull(iterationSinkSlotSharingGroup);
        Assert.assertEquals(iterationSourceSlotSharingGroup, iterationSinkSlotSharingGroup);
        Assert.assertNotNull(iterationSourceCoLocationGroup);
        Assert.assertNotNull(iterationSinkCoLocationGroup);
        Assert.assertEquals(iterationSourceCoLocationGroup, iterationSinkCoLocationGroup);
    }

    @Test
    public void testDefaultScheduleMode() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamGraph streamGraph = new StreamGraphGenerator(Collections.emptyList(), env.getConfig(), env.getCheckpointConfig()).generate();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        Assert.assertEquals((Object)ScheduleMode.EAGER, (Object)jobGraph.getScheduleMode());
    }

    @Test
    public void testSetScheduleMode() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamGraph streamGraph = new StreamGraphGenerator(Collections.emptyList(), env.getConfig(), env.getCheckpointConfig()).setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES).generate();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        Assert.assertEquals((Object)ScheduleMode.LAZY_FROM_SOURCES, (Object)jobGraph.getScheduleMode());
    }

    @Test
    public void testBlockingAfterChainingOffDisabled() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource sourceDataStream = env.fromElements((Object[])new Integer[]{1, 2, 3});
        DataStream partitionAfterSourceDataStream = new DataStream(env, (Transformation)new PartitionTransformation(sourceDataStream.getTransformation(), (StreamPartitioner)new RescalePartitioner(), ShuffleMode.UNDEFINED));
        SingleOutputStreamOperator filterDataStream = partitionAfterSourceDataStream.filter((FilterFunction & Serializable)value -> true).setParallelism(2);
        DataStream partitionAfterFilterDataStream = new DataStream(env, (Transformation)new PartitionTransformation(filterDataStream.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), ShuffleMode.UNDEFINED));
        partitionAfterFilterDataStream.print().setParallelism(2);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assert.assertEquals((long)2L, (long)verticesSorted.size());
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex filterAndPrintVertex = (JobVertex)verticesSorted.get(1);
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((JobEdge)filterAndPrintVertex.getInputs().get(0)).getSource().getResultType());
    }

    @Test
    public void testBlockingConnectionsBetweenChainsEnabled() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource sourceDataStream = env.fromElements((Object[])new Integer[]{1, 2, 3});
        DataStream partitionAfterSourceDataStream = new DataStream(env, (Transformation)new PartitionTransformation(sourceDataStream.getTransformation(), (StreamPartitioner)new RescalePartitioner(), ShuffleMode.UNDEFINED));
        SingleOutputStreamOperator filterDataStream = partitionAfterSourceDataStream.filter((FilterFunction & Serializable)value -> true).setParallelism(2);
        DataStream partitionAfterFilterDataStream = new DataStream(env, (Transformation)new PartitionTransformation(filterDataStream.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), ShuffleMode.UNDEFINED));
        partitionAfterFilterDataStream.map((MapFunction & Serializable)value -> value).setParallelism(2);
        DataStream partitionAfterMapDataStream = new DataStream(env, (Transformation)new PartitionTransformation(filterDataStream.getTransformation(), (StreamPartitioner)new RescalePartitioner(), ShuffleMode.PIPELINED));
        partitionAfterMapDataStream.print().setParallelism(1);
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setBlockingConnectionsBetweenChains(true);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assert.assertEquals((long)3L, (long)verticesSorted.size());
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex filterAndMapVertex = (JobVertex)verticesSorted.get(1);
        JobVertex printVertex = (JobVertex)verticesSorted.get(2);
        Assert.assertEquals((Object)ResultPartitionType.BLOCKING, (Object)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)filterAndMapVertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((JobEdge)printVertex.getInputs().get(0)).getSource().getResultType());
    }
}

