/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.collector;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.junit.Assert;
import org.junit.Test;

public class DirectedOutputTest {
    private static final String TEN = "ten";
    private static final String ODD = "odd";
    private static final String ALL = "all";
    private static final String EVEN_AND_ODD = "evenAndOdd";
    private static final String ODD_AND_TEN = "oddAndTen";
    private static final String EVEN = "even";
    private static final String NON_SELECTED = "nonSelected";
    private static Map<String, List<Long>> outputs = new HashMap<String, List<Long>>();

    @Test
    public void outputSelectorTest() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment((int)1);
        SplitDataStream source = env.generateSequence(1L, 11L).split((OutputSelector)new MyOutputSelector());
        source.select(new String[]{EVEN}).addSink((SinkFunction)new ListSink(EVEN));
        source.select(new String[]{ODD, TEN}).addSink((SinkFunction)new ListSink(ODD_AND_TEN));
        source.select(new String[]{EVEN, ODD}).addSink((SinkFunction)new ListSink(EVEN_AND_ODD));
        source.selectAll().addSink((SinkFunction)new ListSink(ALL));
        env.executeTest(32L);
        Assert.assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), outputs.get(EVEN));
        Assert.assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), outputs.get(ODD_AND_TEN));
        Assert.assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), outputs.get(EVEN_AND_ODD));
        Assert.assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), outputs.get(ALL));
    }

    static final class ListSink
    implements SinkFunction<Long> {
        private static final long serialVersionUID = 1L;
        private String name;
        private transient List<Long> list;

        public ListSink(String name) {
            this.name = name;
        }

        public void invoke(Long value) {
            this.list.add(value);
        }

        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
            in.defaultReadObject();
            outputs.put(this.name, new ArrayList());
            this.list = (List)outputs.get(this.name);
        }
    }

    static final class MyOutputSelector
    implements OutputSelector<Long> {
        private static final long serialVersionUID = 1L;
        List<String> outputs = new ArrayList<String>();

        MyOutputSelector() {
        }

        public Iterable<String> select(Long value) {
            this.outputs.clear();
            if (value % 2L == 0L) {
                this.outputs.add(DirectedOutputTest.EVEN);
            } else {
                this.outputs.add(DirectedOutputTest.ODD);
            }
            if (value == 10L) {
                this.outputs.add(DirectedOutputTest.TEN);
            }
            if (value == 11L) {
                this.outputs.add(DirectedOutputTest.NON_SELECTED);
            }
            return this.outputs;
        }
    }

    static final class MyMap
    implements MapFunction<Long, Long> {
        private static final long serialVersionUID = 1L;

        MyMap() {
        }

        public Long map(Long value) throws Exception {
            return value;
        }
    }
}

