/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.EvenOddOutputSelector;
import org.apache.flink.streaming.util.NoOpIntMap;
import org.junit.Assert;
import org.junit.Test;

public class StreamGraphGeneratorTest {
    @Test
    public void testVirtualTransformations() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromElements((Object[])new Integer[]{1, 10});
        SingleOutputStreamOperator rebalanceMap = source.rebalance().map((MapFunction)new NoOpIntMap());
        SingleOutputStreamOperator broadcastMap = rebalanceMap.forward().global().broadcast().map((MapFunction)new NoOpIntMap());
        broadcastMap.addSink((SinkFunction)new DiscardingSink());
        EvenOddOutputSelector selector1 = new EvenOddOutputSelector();
        EvenOddOutputSelector selector2 = new EvenOddOutputSelector();
        EvenOddOutputSelector selector3 = new EvenOddOutputSelector();
        SingleOutputStreamOperator map1Operator = rebalanceMap.map((MapFunction)new NoOpIntMap());
        DataStream map1 = map1Operator.broadcast().split((OutputSelector)selector1).select(new String[]{"even"});
        SingleOutputStreamOperator map2Operator = rebalanceMap.map((MapFunction)new NoOpIntMap());
        DataStream map2 = map2Operator.split((OutputSelector)selector2).select(new String[]{"odd"}).global();
        SingleOutputStreamOperator map3Operator = rebalanceMap.map((MapFunction)new NoOpIntMap());
        DataStream map3 = map3Operator.global().split((OutputSelector)selector3).select(new String[]{"even"}).shuffle();
        SingleOutputStreamOperator unionedMap = map1.union(new DataStream[]{map2}).union(new DataStream[]{map3}).map((MapFunction)new NoOpIntMap());
        unionedMap.addSink((SinkFunction)new DiscardingSink());
        StreamGraph graph = env.getStreamGraph();
        Assert.assertTrue((boolean)(((StreamEdge)graph.getStreamNode(Integer.valueOf(rebalanceMap.getId())).getInEdges().get(0)).getPartitioner() instanceof RebalancePartitioner));
        Assert.assertTrue((boolean)(((StreamEdge)graph.getStreamNode(Integer.valueOf(broadcastMap.getId())).getInEdges().get(0)).getPartitioner() instanceof BroadcastPartitioner));
        Assert.assertEquals((long)rebalanceMap.getId(), (long)((StreamEdge)graph.getStreamNode(Integer.valueOf(broadcastMap.getId())).getInEdges().get(0)).getSourceVertex().getId());
        Assert.assertTrue((boolean)(((StreamEdge)graph.getStreamNode(Integer.valueOf(map1Operator.getId())).getOutEdges().get(0)).getPartitioner() instanceof BroadcastPartitioner));
        Assert.assertTrue((boolean)((String)((StreamEdge)graph.getStreamNode(Integer.valueOf(map1Operator.getId())).getOutEdges().get(0)).getSelectedNames().get(0)).equals("even"));
        Assert.assertTrue((boolean)graph.getStreamNode(Integer.valueOf(map1Operator.getId())).getOutputSelectors().contains(selector1));
        Assert.assertTrue((boolean)(((StreamEdge)graph.getStreamNode(Integer.valueOf(map2Operator.getId())).getOutEdges().get(0)).getPartitioner() instanceof GlobalPartitioner));
        Assert.assertTrue((boolean)((String)((StreamEdge)graph.getStreamNode(Integer.valueOf(map2Operator.getId())).getOutEdges().get(0)).getSelectedNames().get(0)).equals("odd"));
        Assert.assertTrue((boolean)graph.getStreamNode(Integer.valueOf(map2Operator.getId())).getOutputSelectors().contains(selector2));
        Assert.assertTrue((boolean)(((StreamEdge)graph.getStreamNode(Integer.valueOf(map3Operator.getId())).getOutEdges().get(0)).getPartitioner() instanceof ShufflePartitioner));
        Assert.assertTrue((boolean)((String)((StreamEdge)graph.getStreamNode(Integer.valueOf(map3Operator.getId())).getOutEdges().get(0)).getSelectedNames().get(0)).equals("even"));
        Assert.assertTrue((boolean)graph.getStreamNode(Integer.valueOf(map3Operator.getId())).getOutputSelectors().contains(selector3));
    }

    @Test
    public void testVirtualTransformations2() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromElements((Object[])new Integer[]{1, 10});
        SingleOutputStreamOperator rebalanceMap = source.rebalance().map((MapFunction)new NoOpIntMap());
        SingleOutputStreamOperator map1 = rebalanceMap.map((MapFunction)new NoOpIntMap());
        SingleOutputStreamOperator map2 = rebalanceMap.map((MapFunction)new NoOpIntMap());
        SingleOutputStreamOperator map3 = rebalanceMap.map((MapFunction)new NoOpIntMap());
        EvenOddOutputSelector selector = new EvenOddOutputSelector();
        SingleOutputStreamOperator unionedMap = map1.union(new DataStream[]{map2}).union(new DataStream[]{map3}).broadcast().split((OutputSelector)selector).select(new String[]{"foo"}).map((MapFunction)new NoOpIntMap());
        unionedMap.addSink((SinkFunction)new DiscardingSink());
        StreamGraph graph = env.getStreamGraph();
        Assert.assertTrue((boolean)(((StreamEdge)graph.getStreamNode(Integer.valueOf(map1.getId())).getOutEdges().get(0)).getPartitioner() instanceof BroadcastPartitioner));
        Assert.assertTrue((boolean)((String)((StreamEdge)graph.getStreamNode(Integer.valueOf(map1.getId())).getOutEdges().get(0)).getSelectedNames().get(0)).equals("foo"));
        Assert.assertTrue((boolean)graph.getStreamNode(Integer.valueOf(map1.getId())).getOutputSelectors().contains(selector));
        Assert.assertTrue((boolean)(((StreamEdge)graph.getStreamNode(Integer.valueOf(map2.getId())).getOutEdges().get(0)).getPartitioner() instanceof BroadcastPartitioner));
        Assert.assertTrue((boolean)((String)((StreamEdge)graph.getStreamNode(Integer.valueOf(map2.getId())).getOutEdges().get(0)).getSelectedNames().get(0)).equals("foo"));
        Assert.assertTrue((boolean)graph.getStreamNode(Integer.valueOf(map2.getId())).getOutputSelectors().contains(selector));
        Assert.assertTrue((boolean)(((StreamEdge)graph.getStreamNode(Integer.valueOf(map3.getId())).getOutEdges().get(0)).getPartitioner() instanceof BroadcastPartitioner));
        Assert.assertTrue((boolean)((String)((StreamEdge)graph.getStreamNode(Integer.valueOf(map3.getId())).getOutEdges().get(0)).getSelectedNames().get(0)).equals("foo"));
        Assert.assertTrue((boolean)graph.getStreamNode(Integer.valueOf(map3.getId())).getOutputSelectors().contains(selector));
    }

    @Test
    public void testOutputTypeConfigurationWithOneInputTransformation() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromElements((Object[])new Integer[]{1, 10});
        OutputTypeConfigurableOperationWithOneInput outputTypeConfigurableOperation = new OutputTypeConfigurableOperationWithOneInput();
        SingleOutputStreamOperator result = source.transform("Single input and output type configurable operation", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (OneInputStreamOperator)outputTypeConfigurableOperation);
        result.addSink((SinkFunction)new DiscardingSink());
        env.getStreamGraph();
        Assert.assertEquals((Object)BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation());
    }

    @Test
    public void testOutputTypeConfigurationWithTwoInputTransformation() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source1 = env.fromElements((Object[])new Integer[]{1, 10});
        DataStreamSource source2 = env.fromElements((Object[])new Integer[]{2, 11});
        ConnectedStreams connectedSource = source1.connect((DataStream)source2);
        OutputTypeConfigurableOperationWithTwoInputs outputTypeConfigurableOperation = new OutputTypeConfigurableOperationWithTwoInputs();
        SingleOutputStreamOperator result = connectedSource.transform("Two input and output type configurable operation", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (TwoInputStreamOperator)outputTypeConfigurableOperation);
        result.addSink((SinkFunction)new DiscardingSink());
        env.getStreamGraph();
        Assert.assertEquals((Object)BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation());
    }

    private static class OutputTypeConfigurableOperationWithOneInput
    extends AbstractStreamOperator<Integer>
    implements OneInputStreamOperator<Integer, Integer>,
    OutputTypeConfigurable<Integer> {
        private static final long serialVersionUID = 1L;
        TypeInformation<Integer> tpeInformation;

        private OutputTypeConfigurableOperationWithOneInput() {
        }

        public TypeInformation<Integer> getTypeInformation() {
            return this.tpeInformation;
        }

        public void processElement(StreamRecord<Integer> element) {
            this.output.collect(element);
        }

        public void processWatermark(Watermark mark) {
        }

        public void setOutputType(TypeInformation<Integer> outTypeInfo, ExecutionConfig executionConfig) {
            this.tpeInformation = outTypeInfo;
        }
    }

    private static class OutputTypeConfigurableOperationWithTwoInputs
    extends AbstractStreamOperator<Integer>
    implements TwoInputStreamOperator<Integer, Integer, Integer>,
    OutputTypeConfigurable<Integer> {
        private static final long serialVersionUID = 1L;
        TypeInformation<Integer> tpeInformation;

        private OutputTypeConfigurableOperationWithTwoInputs() {
        }

        public TypeInformation<Integer> getTypeInformation() {
            return this.tpeInformation;
        }

        public void setOutputType(TypeInformation<Integer> outTypeInfo, ExecutionConfig executionConfig) {
            this.tpeInformation = outTypeInfo;
        }

        public void processElement1(StreamRecord<Integer> element) throws Exception {
            this.output.collect(element);
        }

        public void processElement2(StreamRecord<Integer> element) throws Exception {
            this.output.collect(element);
        }

        public void processWatermark1(Watermark mark) throws Exception {
        }

        public void processWatermark2(Watermark mark) throws Exception {
        }

        public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Integer>> output) {
        }
    }
}

