/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.Serializable;
import java.util.List;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;

public class StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest
extends TestLogger {
    @Test
    public void testDefaultGlobalDataExchangeModeIsAllEdgesPipelined() {
        StreamGraph streamGraph = StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.createStreamGraph();
        Assert.assertThat((Object)streamGraph.getGlobalDataExchangeMode(), (Matcher)CoreMatchers.is((Object)GlobalDataExchangeMode.ALL_EDGES_PIPELINED));
    }

    @Test
    public void testAllEdgesBlockingMode() {
        StreamGraph streamGraph = StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.createStreamGraph();
        streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex map1Vertex = (JobVertex)verticesSorted.get(1);
        JobVertex map2Vertex = (JobVertex)verticesSorted.get(2);
        Assert.assertEquals((Object)ResultPartitionType.BLOCKING, (Object)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.BLOCKING, (Object)((IntermediateDataSet)map1Vertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.BLOCKING, (Object)((IntermediateDataSet)map2Vertex.getProducedDataSets().get(0)).getResultType());
    }

    @Test
    public void testAllEdgesPipelinedMode() {
        StreamGraph streamGraph = StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.createStreamGraph();
        streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex map1Vertex = (JobVertex)verticesSorted.get(1);
        JobVertex map2Vertex = (JobVertex)verticesSorted.get(2);
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)map1Vertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)map2Vertex.getProducedDataSets().get(0)).getResultType());
    }

    @Test
    public void testForwardEdgesPipelinedMode() {
        StreamGraph streamGraph = StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.createStreamGraph();
        streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex map1Vertex = (JobVertex)verticesSorted.get(1);
        JobVertex map2Vertex = (JobVertex)verticesSorted.get(2);
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.BLOCKING, (Object)((IntermediateDataSet)map1Vertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.BLOCKING, (Object)((IntermediateDataSet)map2Vertex.getProducedDataSets().get(0)).getResultType());
    }

    @Test
    public void testPointwiseEdgesPipelinedMode() {
        StreamGraph streamGraph = StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.createStreamGraph();
        streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex map1Vertex = (JobVertex)verticesSorted.get(1);
        JobVertex map2Vertex = (JobVertex)verticesSorted.get(2);
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)map1Vertex.getProducedDataSets().get(0)).getResultType());
        Assert.assertEquals((Object)ResultPartitionType.BLOCKING, (Object)((IntermediateDataSet)map2Vertex.getProducedDataSets().get(0)).getResultType());
    }

    @Test
    public void testGlobalDataExchangeModeDoesNotOverrideSpecifiedShuffleMode() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromElements((Object[])new Integer[]{1, 2, 3}).setParallelism(1);
        DataStream forward = new DataStream(env, (Transformation)new PartitionTransformation(source.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), ShuffleMode.PIPELINED));
        forward.map((MapFunction & Serializable)i -> i).startNewChain().setParallelism(1);
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED_BOUNDED, (Object)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType());
    }

    private static StreamGraph createStreamGraph() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromElements((Object[])new Integer[]{1, 2, 3}).setParallelism(1);
        DataStream forward = new DataStream(env, (Transformation)new PartitionTransformation(source.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), ShuffleMode.UNDEFINED));
        SingleOutputStreamOperator map1 = forward.map((MapFunction & Serializable)i -> i).startNewChain().setParallelism(1);
        DataStream rescale = new DataStream(env, (Transformation)new PartitionTransformation(map1.getTransformation(), (StreamPartitioner)new RescalePartitioner(), ShuffleMode.UNDEFINED));
        SingleOutputStreamOperator map2 = rescale.map((MapFunction & Serializable)i -> i).setParallelism(2);
        map2.rebalance().print().setParallelism(2);
        return env.getStreamGraph();
    }
}

