package org.apache.flink.streaming.api.graph;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.EvenOddOutputSelector;
import org.apache.flink.streaming.util.NoOpIntMap;
import org.apache.flink.streaming.util.NoOpSink;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.class */
public class StreamGraphGeneratorTest extends StreamingMultipleProgramsTestBase {

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest$OutputTypeConfigurableOperationWithOneInput.class */
    private static class OutputTypeConfigurableOperationWithOneInput extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer>, OutputTypeConfigurable<Integer> {
        TypeInformation<Integer> tpeInformation;

        private OutputTypeConfigurableOperationWithOneInput() {
        }

        public TypeInformation<Integer> getTypeInformation() {
            return this.tpeInformation;
        }

        public void processElement(StreamRecord<Integer> streamRecord) throws Exception {
            this.output.collect(streamRecord);
        }

        public void processWatermark(Watermark watermark) throws Exception {
        }

        public void setOutputType(TypeInformation<Integer> typeInformation, ExecutionConfig executionConfig) {
            this.tpeInformation = typeInformation;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest$OutputTypeConfigurableOperationWithTwoInputs.class */
    private static class OutputTypeConfigurableOperationWithTwoInputs extends AbstractStreamOperator<Integer> implements TwoInputStreamOperator<Integer, Integer, Integer>, OutputTypeConfigurable<Integer> {
        TypeInformation<Integer> tpeInformation;

        private OutputTypeConfigurableOperationWithTwoInputs() {
        }

        public TypeInformation<Integer> getTypeInformation() {
            return this.tpeInformation;
        }

        public void setOutputType(TypeInformation<Integer> typeInformation, ExecutionConfig executionConfig) {
            this.tpeInformation = typeInformation;
        }

        public void processElement1(StreamRecord streamRecord) throws Exception {
            this.output.collect(streamRecord);
        }

        public void processElement2(StreamRecord streamRecord) throws Exception {
            this.output.collect(streamRecord);
        }

        public void processWatermark1(Watermark watermark) throws Exception {
        }

        public void processWatermark2(Watermark watermark) throws Exception {
        }

        public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Integer>> output) {
        }
    }

    @Test
    public void testVirtualTransformations() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator map = executionEnvironment.fromElements(new Integer[]{1, 10}).rebalance().map(new NoOpIntMap());
        SingleOutputStreamOperator map2 = map.forward().global().broadcast().map(new NoOpIntMap());
        map2.addSink(new NoOpSink());
        EvenOddOutputSelector evenOddOutputSelector = new EvenOddOutputSelector();
        EvenOddOutputSelector evenOddOutputSelector2 = new EvenOddOutputSelector();
        EvenOddOutputSelector evenOddOutputSelector3 = new EvenOddOutputSelector();
        SingleOutputStreamOperator map3 = map.map(new NoOpIntMap());
        DataStream select = map3.broadcast().split(evenOddOutputSelector).select(new String[]{"even"});
        SingleOutputStreamOperator map4 = map.map(new NoOpIntMap());
        DataStream global = map4.split(evenOddOutputSelector2).select(new String[]{"odd"}).global();
        SingleOutputStreamOperator map5 = map.map(new NoOpIntMap());
        select.union(new DataStream[]{global}).union(new DataStream[]{map5.global().split(evenOddOutputSelector3).select(new String[]{"even"}).shuffle()}).map(new NoOpIntMap()).addSink(new NoOpSink());
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        Assert.assertTrue(((StreamEdge) streamGraph.getStreamNode(map.getId()).getInEdges().get(0)).getPartitioner() instanceof RebalancePartitioner);
        Assert.assertTrue(((StreamEdge) streamGraph.getStreamNode(map2.getId()).getInEdges().get(0)).getPartitioner() instanceof BroadcastPartitioner);
        Assert.assertEquals(map.getId(), ((StreamEdge) streamGraph.getStreamNode(map2.getId()).getInEdges().get(0)).getSourceVertex().getId());
        Assert.assertTrue(((StreamEdge) streamGraph.getStreamNode(map3.getId()).getOutEdges().get(0)).getPartitioner() instanceof BroadcastPartitioner);
        Assert.assertTrue(((String) ((StreamEdge) streamGraph.getStreamNode(map3.getId()).getOutEdges().get(0)).getSelectedNames().get(0)).equals("even"));
        Assert.assertTrue(streamGraph.getStreamNode(map3.getId()).getOutputSelectors().contains(evenOddOutputSelector));
        Assert.assertTrue(((StreamEdge) streamGraph.getStreamNode(map4.getId()).getOutEdges().get(0)).getPartitioner() instanceof GlobalPartitioner);
        Assert.assertTrue(((String) ((StreamEdge) streamGraph.getStreamNode(map4.getId()).getOutEdges().get(0)).getSelectedNames().get(0)).equals("odd"));
        Assert.assertTrue(streamGraph.getStreamNode(map4.getId()).getOutputSelectors().contains(evenOddOutputSelector2));
        Assert.assertTrue(((StreamEdge) streamGraph.getStreamNode(map5.getId()).getOutEdges().get(0)).getPartitioner() instanceof ShufflePartitioner);
        Assert.assertTrue(((String) ((StreamEdge) streamGraph.getStreamNode(map5.getId()).getOutEdges().get(0)).getSelectedNames().get(0)).equals("even"));
        Assert.assertTrue(streamGraph.getStreamNode(map5.getId()).getOutputSelectors().contains(evenOddOutputSelector3));
    }

    @Test
    public void testVirtualTransformations2() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator map = executionEnvironment.fromElements(new Integer[]{1, 10}).rebalance().map(new NoOpIntMap());
        SingleOutputStreamOperator map2 = map.map(new NoOpIntMap());
        DataStream map3 = map.map(new NoOpIntMap());
        DataStream map4 = map.map(new NoOpIntMap());
        EvenOddOutputSelector evenOddOutputSelector = new EvenOddOutputSelector();
        map2.union(new DataStream[]{map3}).union(new DataStream[]{map4}).broadcast().split(evenOddOutputSelector).select(new String[]{"foo"}).map(new NoOpIntMap()).addSink(new NoOpSink());
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        Assert.assertTrue(((StreamEdge) streamGraph.getStreamNode(map2.getId()).getOutEdges().get(0)).getPartitioner() instanceof BroadcastPartitioner);
        Assert.assertTrue(((String) ((StreamEdge) streamGraph.getStreamNode(map2.getId()).getOutEdges().get(0)).getSelectedNames().get(0)).equals("foo"));
        Assert.assertTrue(streamGraph.getStreamNode(map2.getId()).getOutputSelectors().contains(evenOddOutputSelector));
        Assert.assertTrue(((StreamEdge) streamGraph.getStreamNode(map3.getId()).getOutEdges().get(0)).getPartitioner() instanceof BroadcastPartitioner);
        Assert.assertTrue(((String) ((StreamEdge) streamGraph.getStreamNode(map3.getId()).getOutEdges().get(0)).getSelectedNames().get(0)).equals("foo"));
        Assert.assertTrue(streamGraph.getStreamNode(map3.getId()).getOutputSelectors().contains(evenOddOutputSelector));
        Assert.assertTrue(((StreamEdge) streamGraph.getStreamNode(map4.getId()).getOutEdges().get(0)).getPartitioner() instanceof BroadcastPartitioner);
        Assert.assertTrue(((String) ((StreamEdge) streamGraph.getStreamNode(map4.getId()).getOutEdges().get(0)).getSelectedNames().get(0)).equals("foo"));
        Assert.assertTrue(streamGraph.getStreamNode(map4.getId()).getOutputSelectors().contains(evenOddOutputSelector));
    }

    @Test
    public void testOutputTypeConfigurationWithOneInputTransformation() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource fromElements = executionEnvironment.fromElements(new Integer[]{1, 10});
        OutputTypeConfigurableOperationWithOneInput outputTypeConfigurableOperationWithOneInput = new OutputTypeConfigurableOperationWithOneInput();
        fromElements.transform("Single input and output type configurable operation", BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperationWithOneInput).addSink(new NoOpSink());
        executionEnvironment.getStreamGraph();
        Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperationWithOneInput.getTypeInformation());
    }

    @Test
    public void testOutputTypeConfigurationWithTwoInputTransformation() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        ConnectedStreams connect = executionEnvironment.fromElements(new Integer[]{1, 10}).connect(executionEnvironment.fromElements(new Integer[]{2, 11}));
        OutputTypeConfigurableOperationWithTwoInputs outputTypeConfigurableOperationWithTwoInputs = new OutputTypeConfigurableOperationWithTwoInputs();
        connect.transform("Two input and output type configurable operation", BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperationWithTwoInputs).addSink(new NoOpSink());
        executionEnvironment.getStreamGraph();
        Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperationWithTwoInputs.getTypeInformation());
    }
}
