package org.apache.flink.streaming.api.function.source;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.class */
public class SocketTextStreamFunction extends RichSourceFunction<String> {
    private static final long serialVersionUID = 1;
    private String hostname;
    private int port;
    private char delimiter;
    private Socket socket;
    private static final int CONNECTION_TIMEOUT_TIME = 0;

    public SocketTextStreamFunction(String str, int i, char c) {
        this.hostname = str;
        this.port = i;
        this.delimiter = c;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.socket = new Socket();
        this.socket.connect(new InetSocketAddress(this.hostname, this.port), 0);
    }

    @Override // org.apache.flink.streaming.api.function.source.SourceFunction
    public void invoke(Collector<String> collector) throws Exception {
        while (!this.socket.isClosed() && this.socket.isConnected()) {
            streamFromSocket(collector, this.socket);
        }
    }

    public void streamFromSocket(Collector<String> collector, Socket socket) throws Exception {
        StringBuffer stringBuffer = new StringBuffer();
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        while (true) {
            int read = bufferedReader.read();
            if (!socket.isConnected() || socket.isClosed() || read == -1) {
                break;
            }
            if (read == this.delimiter) {
                collector.collect(stringBuffer.toString());
                stringBuffer = new StringBuffer();
            } else if (read != 13) {
                stringBuffer.append((char) read);
            }
        }
        if (stringBuffer.length() > 0) {
            collector.collect(stringBuffer.toString());
        }
    }

    public void close() throws Exception {
        this.socket.close();
        super.close();
    }
}
