package org.apache.flink.contrib.streaming;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.net.ConnectionUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;

/* loaded from: input_file:org/apache/flink/contrib/streaming/DataStreamUtils.class */
public final class DataStreamUtils {

    /* loaded from: input_file:org/apache/flink/contrib/streaming/DataStreamUtils$CallExecute.class */
    private static class CallExecute<OUT> extends Thread {
        DataStream<OUT> stream;

        public CallExecute(DataStream<OUT> dataStream) {
            this.stream = dataStream;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.stream.getExecutionEnvironment().execute();
            } catch (Exception e) {
                throw new RuntimeException("Exception in execute()", e);
            }
        }
    }

    public static <OUT> Iterator<OUT> collect(DataStream<OUT> dataStream) {
        InetAddress localHost;
        TypeSerializer createSerializer = dataStream.getType().createSerializer(dataStream.getExecutionEnvironment().getConfig());
        DataStreamIterator dataStreamIterator = new DataStreamIterator(createSerializer);
        RemoteStreamEnvironment executionEnvironment = dataStream.getExecutionEnvironment();
        if (executionEnvironment instanceof RemoteStreamEnvironment) {
            try {
                localHost = ConnectionUtils.findConnectingAddress(new InetSocketAddress(executionEnvironment.getHost(), executionEnvironment.getPort()), 2000L, 400L);
            } catch (IOException e) {
                throw new RuntimeException("IOException while trying to connect to the master", e);
            }
        } else {
            try {
                localHost = InetAddress.getLocalHost();
            } catch (UnknownHostException e2) {
                throw new RuntimeException("getLocalHost failed", e2);
            }
        }
        dataStream.addSink(new CollectSink(localHost, dataStreamIterator.getPort(), createSerializer)).setParallelism(1);
        new CallExecute(dataStream).start();
        return dataStreamIterator;
    }

    private DataStreamUtils() {
        throw new RuntimeException();
    }
}
