/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.CollectSink;
import org.apache.flink.contrib.streaming.SocketStreamIterator;
import org.apache.flink.runtime.net.ConnectionUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public final class DataStreamUtils {
    public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) throws IOException {
        InetAddress clientAddress;
        TypeSerializer serializer = stream.getType().createSerializer(stream.getExecutionEnvironment().getConfig());
        SocketStreamIterator iter = new SocketStreamIterator(serializer);
        StreamExecutionEnvironment env = stream.getExecutionEnvironment();
        if (env instanceof RemoteStreamEnvironment) {
            String host = ((RemoteStreamEnvironment)env).getHost();
            int port = ((RemoteStreamEnvironment)env).getPort();
            try {
                clientAddress = ConnectionUtils.findConnectingAddress((InetSocketAddress)new InetSocketAddress(host, port), (long)2000L, (long)400L);
            }
            catch (Exception e) {
                throw new IOException("Could not determine an suitable network address to receive back data from the streaming program.", e);
            }
        }
        try {
            clientAddress = InetAddress.getLocalHost();
        }
        catch (UnknownHostException e) {
            throw new IOException("Could not determine this machines own local address to receive back data from the streaming program.", e);
        }
        DataStreamSink sink = stream.addSink(new CollectSink(clientAddress, iter.getPort(), serializer));
        sink.setParallelism(1);
        new CallExecute(env, iter).start();
        return iter;
    }

    private DataStreamUtils() {
    }

    private static class CallExecute
    extends Thread {
        private final StreamExecutionEnvironment toTrigger;
        private final SocketStreamIterator<?> toNotify;

        private CallExecute(StreamExecutionEnvironment toTrigger, SocketStreamIterator<?> toNotify) {
            this.toTrigger = toTrigger;
            this.toNotify = toNotify;
        }

        @Override
        public void run() {
            try {
                this.toTrigger.execute();
            }
            catch (Throwable t) {
                this.toNotify.notifyOfError(t);
            }
        }
    }
}

