package org.apache.flink.streaming.api.operators.co;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/co/SelfConnectionTest.class */
public class SelfConnectionTest implements Serializable {
    private static final long serialVersionUID = 1;
    private final int MEMORY_SIZE = 32;
    private static List<String> expected;

    @Test
    public void sameDataStreamTest() {
        TestStreamEnvironment testStreamEnvironment = new TestStreamEnvironment(3, 32L);
        TestListResultSink testListResultSink = new TestListResultSink();
        Timestamp<Integer> timestamp = new Timestamp<Integer>() { // from class: org.apache.flink.streaming.api.operators.co.SelfConnectionTest.1
            private static final long serialVersionUID = 1;

            public long getTimestamp(Integer num) {
                return num.intValue();
            }
        };
        KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() { // from class: org.apache.flink.streaming.api.operators.co.SelfConnectionTest.2
            private static final long serialVersionUID = 1;

            public Integer getKey(Integer num) throws Exception {
                return num;
            }
        };
        DataStreamSource fromElements = testStreamEnvironment.fromElements(new Integer[]{1, 3, 5});
        fromElements.join(fromElements).onWindow(50L, timestamp, timestamp).where(keySelector).equalTo(keySelector).map(new MapFunction<Tuple2<Integer, Integer>, String>() { // from class: org.apache.flink.streaming.api.operators.co.SelfConnectionTest.3
            private static final long serialVersionUID = 1;

            public String map(Tuple2<Integer, Integer> tuple2) throws Exception {
                return tuple2.toString();
            }
        }).addSink(testListResultSink);
        try {
            testStreamEnvironment.execute();
            expected = new ArrayList();
            expected.addAll(Arrays.asList("(1,1)", "(3,3)", "(5,5)"));
            List result = testListResultSink.getResult();
            Collections.sort(expected);
            Collections.sort(result);
            Assert.assertEquals(expected, result);
        } catch (Exception e) {
            Assert.fail();
            e.printStackTrace();
        }
    }

    @Test
    public void differentDataStreamSameChain() {
        TestListResultSink testListResultSink = new TestListResultSink();
        TestStreamEnvironment testStreamEnvironment = new TestStreamEnvironment(1, 32L);
        DataStreamSource fromElements = testStreamEnvironment.fromElements(new Integer[]{1, 3, 5});
        fromElements.map(new MapFunction<Integer, String>() { // from class: org.apache.flink.streaming.api.operators.co.SelfConnectionTest.4
            private static final long serialVersionUID = 1;

            public String map(Integer num) throws Exception {
                return "x " + num;
            }
        }).connect(fromElements).map(new CoMapFunction<String, Integer, String>() { // from class: org.apache.flink.streaming.api.operators.co.SelfConnectionTest.5
            private static final long serialVersionUID = 1;

            public String map1(String str) {
                return str;
            }

            public String map2(Integer num) {
                return String.valueOf(num.intValue() + 1);
            }
        }).addSink(testListResultSink);
        try {
            testStreamEnvironment.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
        expected = new ArrayList();
        expected.addAll(Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6"));
        List result = testListResultSink.getResult();
        Collections.sort(expected);
        Collections.sort(result);
        Assert.assertEquals(expected, result);
    }

    @Test
    public void differentDataStreamDifferentChain() {
        TestListResultSink testListResultSink = new TestListResultSink();
        TestStreamEnvironment testStreamEnvironment = new TestStreamEnvironment(3, 32L);
        SingleOutputStreamOperator disableChaining = testStreamEnvironment.fromElements(new Integer[]{1, 3, 5}).disableChaining();
        disableChaining.flatMap(new FlatMapFunction<Integer, String>() { // from class: org.apache.flink.streaming.api.operators.co.SelfConnectionTest.7
            private static final long serialVersionUID = 1;

            public void flatMap(Integer num, Collector<String> collector) throws Exception {
                collector.collect("x " + num);
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Integer) obj, (Collector<String>) collector);
            }
        }).groupBy(new KeySelector<String, Integer>() { // from class: org.apache.flink.streaming.api.operators.co.SelfConnectionTest.6
            private static final long serialVersionUID = 1;

            public Integer getKey(String str) throws Exception {
                return Integer.valueOf(str.length());
            }
        }).connect(disableChaining.map(new MapFunction<Integer, Long>() { // from class: org.apache.flink.streaming.api.operators.co.SelfConnectionTest.9
            private static final long serialVersionUID = 1;

            public Long map(Integer num) throws Exception {
                return Long.valueOf(num.intValue() + 1);
            }
        }).groupBy(new KeySelector<Long, Long>() { // from class: org.apache.flink.streaming.api.operators.co.SelfConnectionTest.8
            private static final long serialVersionUID = 1;

            public Long getKey(Long l) throws Exception {
                return l;
            }
        })).map(new CoMapFunction<String, Long, String>() { // from class: org.apache.flink.streaming.api.operators.co.SelfConnectionTest.10
            private static final long serialVersionUID = 1;

            public String map1(String str) {
                return str;
            }

            public String map2(Long l) {
                return l.toString();
            }
        }).addSink(testListResultSink);
        try {
            testStreamEnvironment.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
        expected = new ArrayList();
        expected.addAll(Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6"));
        List result = testListResultSink.getResult();
        Collections.sort(expected);
        Collections.sort(result);
        Assert.assertEquals(expected, result);
    }
}
