package org.apache.flink.streaming.api.graph;

import java.lang.reflect.Method;
import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.class */
public class StreamingJobGraphGeneratorTest extends TestLogger {
    @Test
    public void testParallelismOneNotChained() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.fromElements(new String[]{"a", "b", "c", "d", "e", "f"}).map(new MapFunction<String, Tuple2<String, String>>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.1
            public Tuple2<String, String> map(String str) {
                return new Tuple2<>(str, str);
            }
        }).keyBy(new int[]{0}).map(new MapFunction<Tuple2<String, String>, Tuple2<String, String>>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.2
            public Tuple2<String, String> map(Tuple2<String, String> tuple2) {
                return tuple2;
            }
        }).addSink(new SinkFunction<Tuple2<String, String>>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.3
            public void invoke(Tuple2<String, String> tuple2) {
            }
        });
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        streamGraph.setJobName("test job");
        List verticesSortedTopologicallyFromSources = streamGraph.getJobGraph().getVerticesSortedTopologicallyFromSources();
        Assert.assertEquals(2L, r0.getNumberOfVertices());
        Assert.assertEquals(1L, ((JobVertex) verticesSortedTopologicallyFromSources.get(0)).getParallelism());
        Assert.assertEquals(1L, ((JobVertex) verticesSortedTopologicallyFromSources.get(1)).getParallelism());
        JobVertex jobVertex = (JobVertex) verticesSortedTopologicallyFromSources.get(0);
        JobVertex jobVertex2 = (JobVertex) verticesSortedTopologicallyFromSources.get(1);
        Assert.assertEquals(ResultPartitionType.PIPELINED_BOUNDED, ((IntermediateDataSet) jobVertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals(ResultPartitionType.PIPELINED_BOUNDED, ((JobEdge) jobVertex2.getInputs().get(0)).getSource().getResultType());
    }

    @Test
    public void testDisabledCheckpointing() throws Exception {
        StreamGraph streamGraph = new StreamGraph(StreamExecutionEnvironment.getExecutionEnvironment());
        Assert.assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled());
        Assert.assertEquals(Long.MAX_VALUE, StreamingJobGraphGenerator.createJobGraph(streamGraph).getCheckpointingSettings().getCheckpointCoordinatorConfiguration().getCheckpointInterval());
    }

    @Test
    public void testChainStartEndSetting() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Integer[]{1, 2, 3}).map(new MapFunction<Integer, Integer>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.4
            public Integer map(Integer num) throws Exception {
                return num;
            }
        }).print();
        List verticesSortedTopologicallyFromSources = StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph()).getVerticesSortedTopologicallyFromSources();
        JobVertex jobVertex = (JobVertex) verticesSortedTopologicallyFromSources.get(0);
        JobVertex jobVertex2 = (JobVertex) verticesSortedTopologicallyFromSources.get(1);
        Assert.assertEquals(ResultPartitionType.PIPELINED_BOUNDED, ((IntermediateDataSet) jobVertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals(ResultPartitionType.PIPELINED_BOUNDED, ((JobEdge) jobVertex2.getInputs().get(0)).getSource().getResultType());
        StreamConfig streamConfig = new StreamConfig(jobVertex.getConfiguration());
        StreamConfig streamConfig2 = new StreamConfig(jobVertex2.getConfiguration());
        StreamConfig streamConfig3 = (StreamConfig) streamConfig2.getTransitiveChainedTaskConfigs(getClass().getClassLoader()).values().iterator().next();
        Assert.assertTrue(streamConfig.isChainStart());
        Assert.assertTrue(streamConfig.isChainEnd());
        Assert.assertTrue(streamConfig2.isChainStart());
        Assert.assertFalse(streamConfig2.isChainEnd());
        Assert.assertFalse(streamConfig3.isChainStart());
        Assert.assertTrue(streamConfig3.isChainEnd());
    }

    @Test
    public void testResourcesForChainedSourceSink() throws Exception {
        ResourceSpec build = ResourceSpec.newBuilder().setCpuCores(0.1d).setHeapMemoryInMB(100).build();
        ResourceSpec build2 = ResourceSpec.newBuilder().setCpuCores(0.2d).setHeapMemoryInMB(200).build();
        ResourceSpec build3 = ResourceSpec.newBuilder().setCpuCores(0.3d).setHeapMemoryInMB(300).build();
        ResourceSpec build4 = ResourceSpec.newBuilder().setCpuCores(0.4d).setHeapMemoryInMB(400).build();
        ResourceSpec build5 = ResourceSpec.newBuilder().setCpuCores(0.5d).setHeapMemoryInMB(500).build();
        Method declaredMethod = SingleOutputStreamOperator.class.getDeclaredMethod("setResources", ResourceSpec.class);
        declaredMethod.setAccessible(true);
        Method declaredMethod2 = DataStreamSink.class.getDeclaredMethod("setResources", ResourceSpec.class);
        declaredMethod2.setAccessible(true);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource addSource = executionEnvironment.addSource(new ParallelSourceFunction<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.5
            public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> sourceContext) throws Exception {
            }

            public void cancel() {
            }
        });
        declaredMethod.invoke(addSource, build);
        SingleOutputStreamOperator map = addSource.map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.6
            public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> tuple2) throws Exception {
                return tuple2;
            }
        });
        declaredMethod.invoke(map, build2);
        SingleOutputStreamOperator filter = map.filter(new FilterFunction<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.7
            public boolean filter(Tuple2<Integer, Integer> tuple2) throws Exception {
                return false;
            }
        });
        declaredMethod.invoke(filter, build3);
        SingleOutputStreamOperator reduce = filter.keyBy(new int[]{0}).reduce(new ReduceFunction<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.8
            public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> tuple2, Tuple2<Integer, Integer> tuple22) throws Exception {
                return new Tuple2<>(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
            }
        });
        declaredMethod.invoke(reduce, build4);
        declaredMethod2.invoke(reduce.addSink(new SinkFunction<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.9
            public void invoke(Tuple2<Integer, Integer> tuple2) throws Exception {
            }
        }), build5);
        JobGraph createJobGraph = StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph());
        JobVertex jobVertex = (JobVertex) createJobGraph.getVerticesSortedTopologicallyFromSources().get(0);
        JobVertex jobVertex2 = (JobVertex) createJobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        Assert.assertTrue(jobVertex.getMinResources().equals(build.merge(build2).merge(build3)));
        Assert.assertTrue(jobVertex2.getPreferredResources().equals(build4.merge(build5)));
    }

    @Test
    public void testResourcesForIteration() throws Exception {
        ResourceSpec build = ResourceSpec.newBuilder().setCpuCores(0.1d).setHeapMemoryInMB(100).build();
        ResourceSpec build2 = ResourceSpec.newBuilder().setCpuCores(0.2d).setHeapMemoryInMB(200).build();
        ResourceSpec build3 = ResourceSpec.newBuilder().setCpuCores(0.3d).setHeapMemoryInMB(300).build();
        ResourceSpec build4 = ResourceSpec.newBuilder().setCpuCores(0.4d).setHeapMemoryInMB(400).build();
        ResourceSpec build5 = ResourceSpec.newBuilder().setCpuCores(0.5d).setHeapMemoryInMB(500).build();
        Method declaredMethod = SingleOutputStreamOperator.class.getDeclaredMethod("setResources", ResourceSpec.class);
        declaredMethod.setAccessible(true);
        Method declaredMethod2 = DataStreamSink.class.getDeclaredMethod("setResources", ResourceSpec.class);
        declaredMethod2.setAccessible(true);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator name = executionEnvironment.addSource(new ParallelSourceFunction<Integer>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.10
            public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            }

            public void cancel() {
            }
        }).name("test_source");
        declaredMethod.invoke(name, build);
        IterativeStream iterate = name.iterate(3000L);
        declaredMethod.invoke(iterate, build2);
        SingleOutputStreamOperator name2 = iterate.flatMap(new FlatMapFunction<Integer, Integer>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.11
            public void flatMap(Integer num, Collector<Integer> collector) throws Exception {
                collector.collect(num);
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Integer) obj, (Collector<Integer>) collector);
            }
        }).name("test_flatMap");
        declaredMethod.invoke(name2, build3);
        SingleOutputStreamOperator name3 = name2.filter(new FilterFunction<Integer>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.12
            public boolean filter(Integer num) throws Exception {
                return false;
            }
        }).name("test_filter");
        declaredMethod.invoke(name3, build4);
        declaredMethod2.invoke(iterate.closeWith(name3).addSink(new SinkFunction<Integer>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.13
            public void invoke(Integer num) throws Exception {
            }
        }).disableChaining().name("test_sink"), build5);
        for (JobVertex jobVertex : StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph()).getVertices()) {
            if (jobVertex.getName().contains("test_source")) {
                Assert.assertTrue(jobVertex.getMinResources().equals(build));
            } else if (jobVertex.getName().contains("Iteration_Source")) {
                Assert.assertTrue(jobVertex.getPreferredResources().equals(build2));
            } else if (jobVertex.getName().contains("test_flatMap")) {
                Assert.assertTrue(jobVertex.getMinResources().equals(build3.merge(build4)));
            } else if (jobVertex.getName().contains("Iteration_Tail")) {
                Assert.assertTrue(jobVertex.getPreferredResources().equals(ResourceSpec.DEFAULT));
            } else if (jobVertex.getName().contains("test_sink")) {
                Assert.assertTrue(jobVertex.getMinResources().equals(build5));
            }
        }
    }
}
