/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.python.api.streaming.plan;

import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import org.apache.flink.python.api.PythonPlanBinder;
import org.apache.flink.python.api.streaming.plan.PythonPlanReceiver;
import org.apache.flink.python.api.streaming.plan.PythonPlanSender;
import org.apache.flink.python.api.streaming.util.StreamPrinter;

public class PythonPlanStreamer
implements Serializable {
    protected PythonPlanSender sender;
    protected PythonPlanReceiver receiver;
    private Process process;
    private ServerSocket server;
    private Socket socket;

    public Object getRecord() throws IOException {
        return this.getRecord(false);
    }

    public Object getRecord(boolean normalize) throws IOException {
        return this.receiver.getRecord(normalize);
    }

    public void sendRecord(Object record) throws IOException {
        this.sender.sendRecord(record);
    }

    public void open(String tmpPath, String args) throws IOException {
        this.server = new ServerSocket(0);
        this.startPython(tmpPath, args);
        this.socket = this.server.accept();
        this.sender = new PythonPlanSender(this.socket.getOutputStream());
        this.receiver = new PythonPlanReceiver(this.socket.getInputStream());
    }

    private void startPython(String tmpPath, String args) throws IOException {
        String pythonBinaryPath = PythonPlanBinder.usePython3 ? PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH : PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
        try {
            Runtime.getRuntime().exec(pythonBinaryPath);
        }
        catch (IOException ex) {
            throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
        }
        this.process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + tmpPath + PythonPlanBinder.FLINK_PYTHON_PLAN_NAME + args);
        new StreamPrinter(this.process.getInputStream()).start();
        new StreamPrinter(this.process.getErrorStream()).start();
        try {
            Thread.sleep(2000L);
        }
        catch (InterruptedException ex) {
            // empty catch block
        }
        try {
            int value = this.process.exitValue();
            if (value != 0) {
                throw new RuntimeException("Plan file caused an error. Check log-files for details.");
            }
            if (value == 0) {
                throw new RuntimeException("Plan file exited prematurely without an error.");
            }
        }
        catch (IllegalThreadStateException illegalThreadStateException) {
            // empty catch block
        }
        this.process.getOutputStream().write("plan\n".getBytes());
        this.process.getOutputStream().write((this.server.getLocalPort() + "\n").getBytes());
        this.process.getOutputStream().flush();
    }

    public void close() {
        try {
            this.process.exitValue();
        }
        catch (NullPointerException nullPointerException) {
        }
        catch (IllegalThreadStateException ise) {
            this.process.destroy();
        }
    }
}

