/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class StreamingJobGraphGeneratorTest
extends TestLogger {
    @Test
    public void testParallelismOneNotChained() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator input = env.fromElements((Object[])new String[]{"a", "b", "c", "d", "e", "f"}).map((MapFunction)new MapFunction<String, Tuple2<String, String>>(){

            public Tuple2<String, String> map(String value) {
                return new Tuple2((Object)value, (Object)value);
            }
        });
        SingleOutputStreamOperator result = input.keyBy(new int[]{0}).map((MapFunction)new MapFunction<Tuple2<String, String>, Tuple2<String, String>>(){

            public Tuple2<String, String> map(Tuple2<String, String> value) {
                return value;
            }
        });
        result.addSink((SinkFunction)new SinkFunction<Tuple2<String, String>>(){

            public void invoke(Tuple2<String, String> value) {
            }
        });
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setJobName("test job");
        JobGraph jobGraph = streamGraph.getJobGraph();
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assert.assertEquals((long)2L, (long)jobGraph.getNumberOfVertices());
        Assert.assertEquals((long)1L, (long)((JobVertex)verticesSorted.get(0)).getParallelism());
        Assert.assertEquals((long)1L, (long)((JobVertex)verticesSorted.get(1)).getParallelism());
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex mapSinkVertex = (JobVertex)verticesSorted.get(1);
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((JobEdge)mapSinkVertex.getInputs().get(0)).getSource().getResultType());
    }

    @Test
    public void testDisabledCheckpointing() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamGraph streamGraph = new StreamGraph(env);
        Assert.assertFalse((String)"Checkpointing enabled", (boolean)streamGraph.getCheckpointConfig().isCheckpointingEnabled());
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        JobCheckpointingSettings snapshottingSettings = jobGraph.getCheckpointingSettings();
        Assert.assertEquals((long)Long.MAX_VALUE, (long)snapshottingSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval());
    }

    @Test
    public void testChainStartEndSetting() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements((Object[])new Integer[]{1, 2, 3}).map((MapFunction)new MapFunction<Integer, Integer>(){

            public Integer map(Integer value) throws Exception {
                return value;
            }
        }).print();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex mapPrintVertex = (JobVertex)verticesSorted.get(1);
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((JobEdge)mapPrintVertex.getInputs().get(0)).getSource().getResultType());
        StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration());
        StreamConfig mapConfig = new StreamConfig(mapPrintVertex.getConfiguration());
        Map chainedConfigs = mapConfig.getTransitiveChainedTaskConfigs(((Object)((Object)this)).getClass().getClassLoader());
        StreamConfig printConfig = (StreamConfig)chainedConfigs.values().iterator().next();
        Assert.assertTrue((boolean)sourceConfig.isChainStart());
        Assert.assertTrue((boolean)sourceConfig.isChainEnd());
        Assert.assertTrue((boolean)mapConfig.isChainStart());
        Assert.assertFalse((boolean)mapConfig.isChainEnd());
        Assert.assertFalse((boolean)printConfig.isChainStart());
        Assert.assertTrue((boolean)printConfig.isChainEnd());
    }

    @Test
    public void testResourcesForChainedSourceSink() throws Exception {
        ResourceSpec resource1 = ResourceSpec.newBuilder().setCpuCores(0.1).setHeapMemoryInMB(100).build();
        ResourceSpec resource2 = ResourceSpec.newBuilder().setCpuCores(0.2).setHeapMemoryInMB(200).build();
        ResourceSpec resource3 = ResourceSpec.newBuilder().setCpuCores(0.3).setHeapMemoryInMB(300).build();
        ResourceSpec resource4 = ResourceSpec.newBuilder().setCpuCores(0.4).setHeapMemoryInMB(400).build();
        ResourceSpec resource5 = ResourceSpec.newBuilder().setCpuCores(0.5).setHeapMemoryInMB(500).build();
        Method opMethod = SingleOutputStreamOperator.class.getDeclaredMethod("setResources", ResourceSpec.class);
        opMethod.setAccessible(true);
        Method sinkMethod = DataStreamSink.class.getDeclaredMethod("setResources", ResourceSpec.class);
        sinkMethod.setAccessible(true);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.addSource((SourceFunction)new ParallelSourceFunction<Tuple2<Integer, Integer>>(){

            public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
            }

            public void cancel() {
            }
        });
        opMethod.invoke((Object)source, resource1);
        SingleOutputStreamOperator map = source.map((MapFunction)new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>(){

            public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
                return value;
            }
        });
        opMethod.invoke((Object)map, resource2);
        SingleOutputStreamOperator filter = map.filter((FilterFunction)new FilterFunction<Tuple2<Integer, Integer>>(){

            public boolean filter(Tuple2<Integer, Integer> value) throws Exception {
                return false;
            }
        });
        opMethod.invoke((Object)filter, resource3);
        SingleOutputStreamOperator reduce = filter.keyBy(new int[]{0}).reduce((ReduceFunction)new ReduceFunction<Tuple2<Integer, Integer>>(){

            public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
                return new Tuple2(value1.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1));
            }
        });
        opMethod.invoke((Object)reduce, resource4);
        DataStreamSink sink = reduce.addSink((SinkFunction)new SinkFunction<Tuple2<Integer, Integer>>(){

            public void invoke(Tuple2<Integer, Integer> value) throws Exception {
            }
        });
        sinkMethod.invoke((Object)sink, resource5);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        JobVertex sourceMapFilterVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
        JobVertex reduceSinkVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        Assert.assertTrue((boolean)sourceMapFilterVertex.getMinResources().equals((Object)resource1.merge(resource2).merge(resource3)));
        Assert.assertTrue((boolean)reduceSinkVertex.getPreferredResources().equals((Object)resource4.merge(resource5)));
    }

    @Test
    public void testResourcesForIteration() throws Exception {
        ResourceSpec resource1 = ResourceSpec.newBuilder().setCpuCores(0.1).setHeapMemoryInMB(100).build();
        ResourceSpec resource2 = ResourceSpec.newBuilder().setCpuCores(0.2).setHeapMemoryInMB(200).build();
        ResourceSpec resource3 = ResourceSpec.newBuilder().setCpuCores(0.3).setHeapMemoryInMB(300).build();
        ResourceSpec resource4 = ResourceSpec.newBuilder().setCpuCores(0.4).setHeapMemoryInMB(400).build();
        ResourceSpec resource5 = ResourceSpec.newBuilder().setCpuCores(0.5).setHeapMemoryInMB(500).build();
        Method opMethod = SingleOutputStreamOperator.class.getDeclaredMethod("setResources", ResourceSpec.class);
        opMethod.setAccessible(true);
        Method sinkMethod = DataStreamSink.class.getDeclaredMethod("setResources", ResourceSpec.class);
        sinkMethod.setAccessible(true);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator source = env.addSource((SourceFunction)new ParallelSourceFunction<Integer>(){

            public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            }

            public void cancel() {
            }
        }).name("test_source");
        opMethod.invoke((Object)source, resource1);
        IterativeStream iteration = source.iterate(3000L);
        opMethod.invoke((Object)iteration, resource2);
        SingleOutputStreamOperator flatMap = iteration.flatMap((FlatMapFunction)new FlatMapFunction<Integer, Integer>(){

            public void flatMap(Integer value, Collector<Integer> out) throws Exception {
                out.collect((Object)value);
            }
        }).name("test_flatMap");
        opMethod.invoke((Object)flatMap, resource3);
        SingleOutputStreamOperator increment = flatMap.filter((FilterFunction)new FilterFunction<Integer>(){

            public boolean filter(Integer value) throws Exception {
                return false;
            }
        }).name("test_filter");
        opMethod.invoke((Object)increment, resource4);
        DataStreamSink sink = iteration.closeWith((DataStream)increment).addSink((SinkFunction)new SinkFunction<Integer>(){

            public void invoke(Integer value) throws Exception {
            }
        }).disableChaining().name("test_sink");
        sinkMethod.invoke((Object)sink, resource5);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            if (jobVertex.getName().contains("test_source")) {
                Assert.assertTrue((boolean)jobVertex.getMinResources().equals((Object)resource1));
                continue;
            }
            if (jobVertex.getName().contains("Iteration_Source")) {
                Assert.assertTrue((boolean)jobVertex.getPreferredResources().equals((Object)resource2));
                continue;
            }
            if (jobVertex.getName().contains("test_flatMap")) {
                Assert.assertTrue((boolean)jobVertex.getMinResources().equals((Object)resource3.merge(resource4)));
                continue;
            }
            if (jobVertex.getName().contains("Iteration_Tail")) {
                Assert.assertTrue((boolean)jobVertex.getPreferredResources().equals((Object)ResourceSpec.DEFAULT));
                continue;
            }
            if (!jobVertex.getName().contains("test_sink")) continue;
            Assert.assertTrue((boolean)jobVertex.getMinResources().equals((Object)resource5));
        }
    }
}

