package org.apache.flink.python.api.streaming.plan;

import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import org.apache.flink.python.api.PythonPlanBinder;
import org.apache.flink.python.api.streaming.util.StreamPrinter;

/* loaded from: input_file:org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.class */
public class PythonPlanStreamer implements Serializable {
    protected PythonPlanSender sender;
    protected PythonPlanReceiver receiver;
    private Process process;
    private ServerSocket server;
    private Socket socket;

    public Object getRecord() throws IOException {
        return getRecord(false);
    }

    public Object getRecord(boolean z) throws IOException {
        return this.receiver.getRecord(z);
    }

    public void sendRecord(Object obj) throws IOException {
        this.sender.sendRecord(obj);
    }

    public void open(String str, String str2) throws IOException {
        this.server = new ServerSocket(0);
        startPython(str, str2);
        this.socket = this.server.accept();
        this.sender = new PythonPlanSender(this.socket.getOutputStream());
        this.receiver = new PythonPlanReceiver(this.socket.getInputStream());
    }

    private void startPython(String str, String str2) throws IOException {
        int exitValue;
        String str3 = PythonPlanBinder.usePython3 ? PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH : PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
        try {
            Runtime.getRuntime().exec(str3);
            this.process = Runtime.getRuntime().exec(str3 + " -B " + str + PythonPlanBinder.FLINK_PYTHON_PLAN_NAME + str2);
            new StreamPrinter(this.process.getInputStream()).start();
            new StreamPrinter(this.process.getErrorStream()).start();
            try {
                Thread.sleep(2000L);
            } catch (InterruptedException e) {
            }
            try {
                exitValue = this.process.exitValue();
            } catch (IllegalThreadStateException e2) {
            }
            if (exitValue != 0) {
                throw new RuntimeException("Plan file caused an error. Check log-files for details.");
            }
            if (exitValue == 0) {
                throw new RuntimeException("Plan file exited prematurely without an error.");
            }
            this.process.getOutputStream().write("plan\n".getBytes());
            this.process.getOutputStream().write((this.server.getLocalPort() + "\n").getBytes());
            this.process.getOutputStream().flush();
        } catch (IOException e3) {
            throw new RuntimeException(str3 + " does not point to a valid python binary.");
        }
    }

    public void close() {
        try {
            this.process.exitValue();
        } catch (IllegalThreadStateException e) {
            this.process.destroy();
        } catch (NullPointerException e2) {
        }
    }
}
