package org.apache.flink.streaming.api.functions.source;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.class */
public class SocketTextStreamFunction extends RichSourceFunction<String> {
    protected static final Logger LOG = LoggerFactory.getLogger(SocketTextStreamFunction.class);
    private static final long serialVersionUID = 1;
    private String hostname;
    private int port;
    private char delimiter;
    private long maxRetry;
    private boolean retryForever;
    private Socket socket;
    private static final int CONNECTION_TIMEOUT_TIME = 0;
    private static final int CONNECTION_RETRY_SLEEP = 1000;
    private volatile boolean isRunning;

    public SocketTextStreamFunction(String str, int i, char c, long j) {
        this.hostname = str;
        this.port = i;
        this.delimiter = c;
        this.maxRetry = j;
        this.retryForever = j < 0;
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.socket = new Socket();
        this.socket.connect(new InetSocketAddress(this.hostname, this.port), 0);
        this.isRunning = true;
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
        streamFromSocket(sourceContext, this.socket);
    }

    public void streamFromSocket(SourceFunction.SourceContext<String> sourceContext, Socket socket) throws Exception {
        try {
            StringBuffer stringBuffer = new StringBuffer();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            while (true) {
                if (!this.isRunning) {
                    break;
                }
                try {
                    int read = bufferedReader.read();
                    if (read == -1) {
                        socket.close();
                        long j = 0;
                        boolean z = false;
                        while (j < this.maxRetry && !z) {
                            if (!this.retryForever) {
                                j++;
                            }
                            LOG.warn("Lost connection to server socket. Retrying in 1 seconds...");
                            try {
                                socket = new Socket();
                                socket.connect(new InetSocketAddress(this.hostname, this.port), 0);
                                z = true;
                            } catch (ConnectException e) {
                                Thread.sleep(1000L);
                            }
                        }
                        if (!z) {
                            LOG.error("Could not reconnect to server socket.");
                            break;
                        } else {
                            LOG.info("Server socket is reconnected.");
                            bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                        }
                    } else if (read == this.delimiter) {
                        sourceContext.collect(stringBuffer.toString());
                        stringBuffer = new StringBuffer();
                    } else if (read != 13) {
                        stringBuffer.append((char) read);
                    }
                } catch (SocketException e2) {
                    if (this.isRunning) {
                        throw e2;
                    }
                }
            }
            if (stringBuffer.length() > 0) {
                sourceContext.collect(stringBuffer.toString());
            }
        } finally {
            socket.close();
        }
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void cancel() {
        this.isRunning = false;
        if (this.socket == null || this.socket.isClosed()) {
            return;
        }
        try {
            this.socket.close();
        } catch (IOException e) {
            if (LOG.isErrorEnabled()) {
                LOG.error("Could not close open socket");
            }
        }
    }
}
