/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.streamvertex;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class StreamVertexTest {
    private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
    private static final int PARALLELISM = 1;
    private static final int SOURCE_PARALELISM = 1;
    private static final long MEMORYSIZE = 32L;
    static HashSet<String> resultSet;

    @Test
    public void wrongJobGraph() {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment((int)1);
        try {
            env.fromCollection(null);
            Assert.fail();
        }
        catch (NullPointerException e) {
            // empty catch block
        }
        try {
            env.fromElements(new Serializable[0]);
            Assert.fail();
        }
        catch (IllegalArgumentException e) {
            // empty catch block
        }
        try {
            env.generateSequence(-10L, -30L);
            Assert.fail();
        }
        catch (IllegalArgumentException e) {
            // empty catch block
        }
        try {
            env.setBufferTimeout(-10L);
            Assert.fail();
        }
        catch (IllegalArgumentException e) {
            // empty catch block
        }
        try {
            env.generateSequence(1L, 10L).project(new int[]{2});
            Assert.fail();
        }
        catch (RuntimeException runtimeException) {
            // empty catch block
        }
    }

    @Test
    public void coTest() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment((int)1);
        DataStreamSource fromStringElements = env.fromElements((Serializable[])new String[]{"aa", "bb", "cc"});
        DataStreamSource generatedSequence = env.generateSequence(0L, 3L);
        fromStringElements.connect((DataStream)generatedSequence).map((CoMapFunction)new CoMap()).addSink((SinkFunction)new SetSink());
        resultSet = new HashSet();
        env.executeTest(32L);
        HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1", "2", "3"));
        Assert.assertEquals(expectedSet, resultSet);
    }

    @Test
    public void runStream() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment((int)1);
        env.addSource((SourceFunction)new MySource()).setParallelism(1).map((MapFunction)new MyTask()).addSink((SinkFunction)new MySink());
        env.executeTest(32L);
        Assert.assertEquals((long)10L, (long)data.keySet().size());
        for (Integer k : data.keySet()) {
            Assert.assertEquals((Object)(k + 1), (Object)data.get(k));
        }
    }

    private static class SetSink
    implements SinkFunction<String> {
        private static final long serialVersionUID = 1L;

        private SetSink() {
        }

        public void invoke(String value) {
            resultSet.add(value);
        }
    }

    private static class CoMap
    implements CoMapFunction<String, Long, String> {
        private static final long serialVersionUID = 1L;

        private CoMap() {
        }

        public String map1(String value) {
            return value;
        }

        public String map2(Long value) {
            return value.toString();
        }
    }

    public static class MySink
    implements SinkFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;

        public void invoke(Tuple2<Integer, Integer> tuple) {
            Integer k = (Integer)tuple.getField(0);
            Integer v = (Integer)tuple.getField(1);
            data.put(k, v);
        }
    }

    public static class MyTask
    extends RichMapFunction<Tuple1<Integer>, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;

        public Tuple2<Integer, Integer> map(Tuple1<Integer> value) throws Exception {
            Integer i = (Integer)value.f0;
            return new Tuple2((Object)i, (Object)(i + 1));
        }
    }

    public static class MySource
    implements SourceFunction<Tuple1<Integer>> {
        private static final long serialVersionUID = 1L;
        private Tuple1<Integer> tuple = new Tuple1((Object)0);

        public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
            for (int i = 0; i < 10; ++i) {
                this.tuple.f0 = i;
                collector.collect(this.tuple);
            }
        }
    }
}

