/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class SelfConnectionITCase
extends AbstractTestBaseJUnit4 {
    @Test
    public void differentDataStreamSameChain() throws Exception {
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource src = env.fromData((Object[])new Integer[]{1, 3, 5});
        SingleOutputStreamOperator stringMap = src.map((MapFunction & Serializable)value -> "x " + value);
        stringMap.connect((DataStream)src).map((CoMapFunction)new CoMapFunction<String, Integer, String>(){

            public String map1(String value) {
                return value;
            }

            public String map2(Integer value) {
                return String.valueOf(value + 1);
            }
        }).addSink((SinkFunction)resultSink);
        env.execute();
        List<String> expected = Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6");
        List result = resultSink.getResult();
        Collections.sort(expected);
        Collections.sort(result);
        Assert.assertEquals(expected, (Object)result);
    }

    @Test
    public void differentDataStreamDifferentChain() throws Exception {
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        SingleOutputStreamOperator src = env.fromData((Object[])new Integer[]{1, 3, 5}).disableChaining();
        KeyedStream stringMap = src.flatMap((FlatMapFunction)new FlatMapFunction<Integer, String>(){

            public void flatMap(Integer value, Collector<String> out) throws Exception {
                out.collect((Object)("x " + value));
            }
        }).keyBy(String::length);
        KeyedStream longMap = src.map((MapFunction & Serializable)value -> value + 1).keyBy(Long::intValue);
        stringMap.connect((DataStream)longMap).map((CoMapFunction)new CoMapFunction<String, Long, String>(){

            public String map1(String value) {
                return value;
            }

            public String map2(Long value) {
                return value.toString();
            }
        }).addSink((SinkFunction)resultSink);
        env.execute();
        List<String> expected = Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6");
        List result = resultSink.getResult();
        Collections.sort(expected);
        Collections.sort(result);
        Assert.assertEquals(expected, (Object)result);
    }
}

