/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.python.api.streaming.data;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Iterator;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.api.PythonPlanBinder;
import org.apache.flink.python.api.streaming.data.PythonReceiver;
import org.apache.flink.python.api.streaming.data.PythonSender;
import org.apache.flink.python.api.streaming.util.SerializationUtils;
import org.apache.flink.python.api.streaming.util.StreamPrinter;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PythonStreamer
implements Serializable {
    protected static final Logger LOG = LoggerFactory.getLogger(PythonStreamer.class);
    private static final int SIGNAL_BUFFER_REQUEST = 0;
    private static final int SIGNAL_BUFFER_REQUEST_G0 = -3;
    private static final int SIGNAL_BUFFER_REQUEST_G1 = -4;
    private static final int SIGNAL_FINISHED = -1;
    private static final int SIGNAL_ERROR = -2;
    private static final byte SIGNAL_LAST = 32;
    private final int id;
    private final boolean usePython3;
    private final String planArguments;
    private String inputFilePath;
    private String outputFilePath;
    private Process process;
    private Thread shutdownThread;
    protected ServerSocket server;
    protected Socket socket;
    protected DataInputStream in;
    protected DataOutputStream out;
    protected int port;
    protected PythonSender sender;
    protected PythonReceiver receiver;
    protected StringBuilder msg = new StringBuilder();
    protected final AbstractRichFunction function;

    public PythonStreamer(AbstractRichFunction function, int id, boolean usesByteArray) {
        this.id = id;
        this.usePython3 = PythonPlanBinder.usePython3;
        this.planArguments = PythonPlanBinder.arguments.toString();
        this.sender = new PythonSender();
        this.receiver = new PythonReceiver(usesByteArray);
        this.function = function;
    }

    public void open() throws IOException {
        this.server = new ServerSocket(0);
        this.startPython();
    }

    private void startPython() throws IOException {
        this.outputFilePath = PythonPlanBinder.FLINK_TMP_DATA_DIR + "/" + this.id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
        this.inputFilePath = PythonPlanBinder.FLINK_TMP_DATA_DIR + "/" + this.id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input";
        this.sender.open(this.inputFilePath);
        this.receiver.open(this.outputFilePath);
        String path = this.function.getRuntimeContext().getDistributedCache().getFile("flink").getAbsolutePath();
        String planPath = path + PythonPlanBinder.FLINK_PYTHON_PLAN_NAME;
        String pythonBinaryPath = this.usePython3 ? PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH : PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
        try {
            Runtime.getRuntime().exec(pythonBinaryPath);
        }
        catch (IOException ex) {
            throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
        }
        this.process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + this.planArguments);
        new StreamPrinter(this.process.getInputStream()).start();
        new StreamPrinter(this.process.getErrorStream(), true, this.msg).start();
        this.shutdownThread = new Thread(){

            @Override
            public void run() {
                try {
                    PythonStreamer.this.destroyProcess();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
        };
        Runtime.getRuntime().addShutdownHook(this.shutdownThread);
        OutputStream processOutput = this.process.getOutputStream();
        processOutput.write("operator\n".getBytes());
        processOutput.write(("" + this.server.getLocalPort() + "\n").getBytes());
        processOutput.write((this.id + "\n").getBytes());
        processOutput.write((this.function.getRuntimeContext().getIndexOfThisSubtask() + "\n").getBytes());
        processOutput.write((this.inputFilePath + "\n").getBytes());
        processOutput.write((this.outputFilePath + "\n").getBytes());
        processOutput.flush();
        try {
            Thread.sleep(2000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        try {
            this.process.exitValue();
            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " terminated prematurely." + this.msg);
        }
        catch (IllegalThreadStateException illegalThreadStateException) {
            this.socket = this.server.accept();
            this.in = new DataInputStream(this.socket.getInputStream());
            this.out = new DataOutputStream(this.socket.getOutputStream());
            return;
        }
    }

    public void close() throws IOException {
        try {
            this.socket.close();
            this.sender.close();
            this.receiver.close();
        }
        catch (Exception e) {
            LOG.error("Exception occurred while closing Streamer. :" + e.getMessage());
        }
        this.destroyProcess();
        if (this.shutdownThread != null) {
            Runtime.getRuntime().removeShutdownHook(this.shutdownThread);
        }
    }

    private void destroyProcess() throws IOException {
        try {
            this.process.exitValue();
        }
        catch (IllegalThreadStateException ise) {
            if (this.process.getClass().getName().equals("java.lang.UNIXProcess")) {
                int pid;
                try {
                    Field f = this.process.getClass().getDeclaredField("pid");
                    f.setAccessible(true);
                    pid = f.getInt(this.process);
                }
                catch (Throwable e) {
                    this.process.destroy();
                    return;
                }
                String[] args = new String[]{"kill", "-9", "" + pid};
                Runtime.getRuntime().exec(args);
            }
            this.process.destroy();
        }
    }

    private void sendWriteNotification(int size, boolean hasNext) throws IOException {
        this.out.writeInt(size);
        this.out.writeByte(hasNext ? 0 : 32);
        this.out.flush();
    }

    private void sendReadConfirmation() throws IOException {
        this.out.writeByte(1);
        this.out.flush();
    }

    public final void sendBroadCastVariables(Configuration config) throws IOException {
        try {
            int broadcastCount = config.getInteger("PLANBINDER_BCVAR_COUNT", 0);
            String[] names = new String[broadcastCount];
            for (int x = 0; x < names.length; ++x) {
                names[x] = config.getString("PLANBINDER_BCVAR_" + x, null);
            }
            this.out.write(new SerializationUtils.IntSerializer().serializeWithoutTypeInfo(broadcastCount));
            SerializationUtils.StringSerializer stringSerializer = new SerializationUtils.StringSerializer();
            for (String name : names) {
                Iterator bcv = this.function.getRuntimeContext().getBroadcastVariable(name).iterator();
                this.out.write(stringSerializer.serializeWithoutTypeInfo(name));
                while (bcv.hasNext()) {
                    this.out.writeByte(1);
                    this.out.write((byte[])bcv.next());
                }
                this.out.writeByte(0);
            }
        }
        catch (SocketTimeoutException ste) {
            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " stopped responding." + this.msg);
        }
    }

    public final void streamBufferWithoutGroups(Iterator i, Collector c) throws IOException {
        block11: {
            try {
                if (!i.hasNext()) break block11;
                block9: while (true) {
                    int sig = this.in.readInt();
                    switch (sig) {
                        case 0: {
                            if (i.hasNext() || this.sender.hasRemaining(0)) {
                                int size = this.sender.sendBuffer(i, 0);
                                this.sendWriteNotification(size, this.sender.hasRemaining(0) || i.hasNext());
                                continue block9;
                            }
                            throw new RuntimeException("External process requested data even though none is available.");
                        }
                        case -1: {
                            return;
                        }
                        case -2: {
                            try {
                                Thread.sleep(2000L);
                            }
                            catch (InterruptedException interruptedException) {
                                // empty catch block
                            }
                            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + this.msg);
                        }
                    }
                    this.receiver.collectBuffer(c, sig);
                    this.sendReadConfirmation();
                }
            }
            catch (SocketTimeoutException ste) {
                throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " stopped responding." + this.msg);
            }
        }
    }

    public final void streamBufferWithGroups(Iterator i1, Iterator i2, Collector c) throws IOException {
        block11: {
            try {
                if (!i1.hasNext() && !i2.hasNext()) break block11;
                block10: while (true) {
                    int sig = this.in.readInt();
                    switch (sig) {
                        case -3: {
                            if (!i1.hasNext() && !this.sender.hasRemaining(0)) continue block10;
                            int size = this.sender.sendBuffer(i1, 0);
                            this.sendWriteNotification(size, this.sender.hasRemaining(0) || i1.hasNext());
                            continue block10;
                        }
                        case -4: {
                            if (!i2.hasNext() && !this.sender.hasRemaining(1)) continue block10;
                            int size = this.sender.sendBuffer(i2, 1);
                            this.sendWriteNotification(size, this.sender.hasRemaining(1) || i2.hasNext());
                            continue block10;
                        }
                        case -1: {
                            return;
                        }
                        case -2: {
                            try {
                                Thread.sleep(2000L);
                            }
                            catch (InterruptedException interruptedException) {
                                // empty catch block
                            }
                            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + this.msg);
                        }
                    }
                    this.receiver.collectBuffer(c, sig);
                    this.sendReadConfirmation();
                }
            }
            catch (SocketTimeoutException ste) {
                throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " stopped responding." + this.msg);
            }
        }
    }
}

