package org.apache.flink.python.api.streaming.data;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Iterator;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.api.PythonPlanBinder;
import org.apache.flink.python.api.streaming.util.SerializationUtils;
import org.apache.flink.python.api.streaming.util.StreamPrinter;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/python/api/streaming/data/PythonStreamer.class */
public class PythonStreamer implements Serializable {
    protected static final Logger LOG = LoggerFactory.getLogger(PythonStreamer.class);
    private static final int SIGNAL_BUFFER_REQUEST = 0;
    private static final int SIGNAL_BUFFER_REQUEST_G0 = -3;
    private static final int SIGNAL_BUFFER_REQUEST_G1 = -4;
    private static final int SIGNAL_FINISHED = -1;
    private static final int SIGNAL_ERROR = -2;
    private static final byte SIGNAL_LAST = 32;
    private final int id;
    private String inputFilePath;
    private String outputFilePath;
    private Process process;
    private Thread shutdownThread;
    protected ServerSocket server;
    protected Socket socket;
    protected DataInputStream in;
    protected DataOutputStream out;
    protected int port;
    protected PythonReceiver receiver;
    protected final AbstractRichFunction function;
    protected StringBuilder msg = new StringBuilder();
    private final boolean usePython3 = PythonPlanBinder.usePython3;
    private final String planArguments = PythonPlanBinder.arguments.toString();
    protected PythonSender sender = new PythonSender();

    public PythonStreamer(AbstractRichFunction abstractRichFunction, int i, boolean z) {
        this.id = i;
        this.receiver = new PythonReceiver(z);
        this.function = abstractRichFunction;
    }

    public void open() throws IOException {
        this.server = new ServerSocket(SIGNAL_BUFFER_REQUEST);
        startPython();
    }

    private void startPython() throws IOException {
        this.outputFilePath = PythonPlanBinder.FLINK_TMP_DATA_DIR + "/" + this.id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
        this.inputFilePath = PythonPlanBinder.FLINK_TMP_DATA_DIR + "/" + this.id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input";
        this.sender.open(this.inputFilePath);
        this.receiver.open(this.outputFilePath);
        String str = this.function.getRuntimeContext().getDistributedCache().getFile(PythonPlanBinder.FLINK_PYTHON_DC_ID).getAbsolutePath() + PythonPlanBinder.FLINK_PYTHON_PLAN_NAME;
        String str2 = this.usePython3 ? PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH : PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
        try {
            Runtime.getRuntime().exec(str2);
            this.process = Runtime.getRuntime().exec(str2 + " -O -B " + str + this.planArguments);
            new StreamPrinter(this.process.getInputStream()).start();
            new StreamPrinter(this.process.getErrorStream(), true, this.msg).start();
            this.shutdownThread = new Thread() { // from class: org.apache.flink.python.api.streaming.data.PythonStreamer.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        PythonStreamer.this.destroyProcess();
                    } catch (IOException e) {
                    }
                }
            };
            Runtime.getRuntime().addShutdownHook(this.shutdownThread);
            OutputStream outputStream = this.process.getOutputStream();
            outputStream.write("operator\n".getBytes());
            outputStream.write(("" + this.server.getLocalPort() + "\n").getBytes());
            outputStream.write((this.id + "\n").getBytes());
            outputStream.write((this.inputFilePath + "\n").getBytes());
            outputStream.write((this.outputFilePath + "\n").getBytes());
            outputStream.flush();
            try {
                Thread.sleep(2000L);
            } catch (InterruptedException e) {
            }
            try {
                this.process.exitValue();
                throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " terminated prematurely." + ((Object) this.msg));
            } catch (IllegalThreadStateException e2) {
                this.socket = this.server.accept();
                this.in = new DataInputStream(this.socket.getInputStream());
                this.out = new DataOutputStream(this.socket.getOutputStream());
            }
        } catch (IOException e3) {
            throw new RuntimeException(str2 + " does not point to a valid python binary.");
        }
    }

    public void close() throws IOException {
        try {
            this.socket.close();
            this.sender.close();
            this.receiver.close();
        } catch (Exception e) {
            LOG.error("Exception occurred while closing Streamer. :" + e.getMessage());
        }
        destroyProcess();
        if (this.shutdownThread != null) {
            Runtime.getRuntime().removeShutdownHook(this.shutdownThread);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void destroyProcess() throws IOException {
        try {
            this.process.exitValue();
        } catch (IllegalThreadStateException e) {
            if (!this.process.getClass().getName().equals("java.lang.UNIXProcess")) {
                this.process.destroy();
                return;
            }
            try {
                Field declaredField = this.process.getClass().getDeclaredField("pid");
                declaredField.setAccessible(true);
                Runtime.getRuntime().exec(new String[]{"kill", "-9", "" + declaredField.getInt(this.process)});
            } catch (Throwable th) {
                this.process.destroy();
            }
        }
    }

    private void sendWriteNotification(int i, boolean z) throws IOException {
        this.out.writeInt(i);
        this.out.writeByte(z ? SIGNAL_BUFFER_REQUEST : 32);
        this.out.flush();
    }

    private void sendReadConfirmation() throws IOException {
        this.out.writeByte(1);
        this.out.flush();
    }

    public final void sendBroadCastVariables(Configuration configuration) throws IOException {
        try {
            int integer = configuration.getInteger(PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT, SIGNAL_BUFFER_REQUEST);
            String[] strArr = new String[integer];
            for (int i = SIGNAL_BUFFER_REQUEST; i < strArr.length; i++) {
                strArr[i] = configuration.getString(PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + i, (String) null);
            }
            this.out.write(new SerializationUtils.IntSerializer().serializeWithoutTypeInfo(Integer.valueOf(integer)));
            SerializationUtils.StringSerializer stringSerializer = new SerializationUtils.StringSerializer();
            int length = strArr.length;
            for (int i2 = SIGNAL_BUFFER_REQUEST; i2 < length; i2++) {
                String str = strArr[i2];
                Iterator it = this.function.getRuntimeContext().getBroadcastVariable(str).iterator();
                this.out.write(stringSerializer.serializeWithoutTypeInfo(str));
                while (it.hasNext()) {
                    this.out.writeByte(1);
                    this.out.write((byte[]) it.next());
                }
                this.out.writeByte(SIGNAL_BUFFER_REQUEST);
            }
        } catch (SocketTimeoutException e) {
            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " stopped responding." + ((Object) this.msg));
        }
    }

    public final void streamBufferWithoutGroups(Iterator it, Collector collector) throws IOException {
        try {
            if (it.hasNext()) {
                while (true) {
                    int readInt = this.in.readInt();
                    switch (readInt) {
                        case SIGNAL_ERROR /* -2 */:
                            try {
                                Thread.sleep(2000L);
                            } catch (InterruptedException e) {
                            }
                            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + ((Object) this.msg));
                        case SIGNAL_FINISHED /* -1 */:
                            return;
                        case SIGNAL_BUFFER_REQUEST /* 0 */:
                            if (!it.hasNext() && !this.sender.hasRemaining(SIGNAL_BUFFER_REQUEST)) {
                                throw new RuntimeException("External process requested data even though none is available.");
                            }
                            sendWriteNotification(this.sender.sendBuffer(it, SIGNAL_BUFFER_REQUEST), this.sender.hasRemaining(SIGNAL_BUFFER_REQUEST) || it.hasNext());
                            break;
                        default:
                            this.receiver.collectBuffer(collector, readInt);
                            sendReadConfirmation();
                            break;
                    }
                }
            }
        } catch (SocketTimeoutException e2) {
            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " stopped responding." + ((Object) this.msg));
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:11:0x001d. Please report as an issue. */
    public final void streamBufferWithGroups(Iterator it, Iterator it2, Collector collector) throws IOException {
        try {
            if (!it.hasNext() && !it2.hasNext()) {
                return;
            }
            while (true) {
                int readInt = this.in.readInt();
                switch (readInt) {
                    case SIGNAL_BUFFER_REQUEST_G1 /* -4 */:
                        if (it2.hasNext() || this.sender.hasRemaining(1)) {
                            sendWriteNotification(this.sender.sendBuffer(it2, 1), this.sender.hasRemaining(1) || it2.hasNext());
                        }
                        break;
                    case SIGNAL_BUFFER_REQUEST_G0 /* -3 */:
                        if (it.hasNext() || this.sender.hasRemaining(SIGNAL_BUFFER_REQUEST)) {
                            sendWriteNotification(this.sender.sendBuffer(it, SIGNAL_BUFFER_REQUEST), this.sender.hasRemaining(SIGNAL_BUFFER_REQUEST) || it.hasNext());
                        }
                        break;
                    case SIGNAL_ERROR /* -2 */:
                        try {
                            Thread.sleep(2000L);
                        } catch (InterruptedException e) {
                        }
                        throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + ((Object) this.msg));
                    case SIGNAL_FINISHED /* -1 */:
                        return;
                    default:
                        this.receiver.collectBuffer(collector, readInt);
                        sendReadConfirmation();
                }
            }
        } catch (SocketTimeoutException e2) {
            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " stopped responding." + ((Object) this.msg));
        }
    }
}
