package org.apache.flink.languagebinding.api.java.python.streaming;

import java.io.IOException;
import java.lang.reflect.Field;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.languagebinding.api.java.common.PlanBinder;
import org.apache.flink.languagebinding.api.java.common.streaming.StreamPrinter;
import org.apache.flink.languagebinding.api.java.common.streaming.Streamer;
import org.apache.flink.languagebinding.api.java.python.PythonPlanBinder;

/* loaded from: input_file:org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.class */
public class PythonStreamer extends Streamer {
    private final byte[] operator;
    private Process process;
    private final String metaInformation;
    private final int id;
    private final boolean usePython3;
    private final boolean debug;
    private Thread shutdownThread;
    private String inputFilePath;
    private String outputFilePath;

    public PythonStreamer(AbstractRichFunction abstractRichFunction, int i, byte[] bArr, String str) {
        super(abstractRichFunction);
        this.operator = bArr;
        this.metaInformation = str;
        this.id = i;
        this.usePython3 = PythonPlanBinder.usePython3;
        this.debug = PlanBinder.DEBUG;
    }

    public void setupProcess() throws IOException {
        startPython();
    }

    private void startPython() throws IOException {
        this.outputFilePath = PlanBinder.FLINK_TMP_DATA_DIR + "/" + this.id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
        this.inputFilePath = PlanBinder.FLINK_TMP_DATA_DIR + "/" + this.id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input";
        this.sender.open(this.inputFilePath);
        this.receiver.open(this.outputFilePath);
        ProcessBuilder processBuilder = new ProcessBuilder(new String[0]);
        String str = this.function.getRuntimeContext().getDistributedCache().getFile(PythonPlanBinder.FLINK_PYTHON_DC_ID).getAbsolutePath() + PythonPlanBinder.FLINK_PYTHON_EXECUTOR_NAME;
        String[] split = this.metaInformation.split("\\|");
        StringBuilder sb = new StringBuilder();
        if (split[0].contains("__main__")) {
            sb.append("from ");
            sb.append(PythonPlanBinder.FLINK_PYTHON_PLAN_NAME.substring(1, PythonPlanBinder.FLINK_PYTHON_PLAN_NAME.length() - 3));
            sb.append(" import ");
            sb.append(split[1]);
        } else {
            sb.append("import ");
            sb.append(PythonPlanBinder.FLINK_PYTHON_PLAN_NAME.substring(1, PythonPlanBinder.FLINK_PYTHON_PLAN_NAME.length() - 3));
        }
        String str2 = this.usePython3 ? PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH : PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
        try {
            Runtime.getRuntime().exec(str2);
            processBuilder.command(str2, "-O", "-B", str, "" + this.server.getLocalPort());
            if (this.debug) {
                this.socket.setSoTimeout(0);
                LOG.info("Waiting for Python Process : " + this.function.getRuntimeContext().getTaskName() + " Run python /tmp/flink" + PythonPlanBinder.FLINK_PYTHON_EXECUTOR_NAME + " " + this.server.getLocalPort());
            } else {
                this.process = processBuilder.start();
                new StreamPrinter(this.process.getInputStream()).start();
                new StreamPrinter(this.process.getErrorStream(), true, this.msg).start();
            }
            this.shutdownThread = new Thread() { // from class: org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        PythonStreamer.this.destroyProcess();
                    } catch (IOException e) {
                    }
                }
            };
            Runtime.getRuntime().addShutdownHook(this.shutdownThread);
            this.socket = this.server.accept();
            this.in = this.socket.getInputStream();
            this.out = this.socket.getOutputStream();
            byte[] bArr = new byte[4];
            putInt(bArr, 0, this.operator.length);
            this.out.write(bArr, 0, 4);
            this.out.write(this.operator, 0, this.operator.length);
            byte[] bytes = sb.toString().getBytes("utf-8");
            putInt(bArr, 0, bytes.length);
            this.out.write(bArr, 0, 4);
            this.out.write(bytes, 0, bytes.length);
            byte[] bytes2 = this.inputFilePath.getBytes("utf-8");
            putInt(bArr, 0, bytes2.length);
            this.out.write(bArr, 0, 4);
            this.out.write(bytes2, 0, bytes2.length);
            byte[] bytes3 = this.outputFilePath.getBytes("utf-8");
            putInt(bArr, 0, bytes3.length);
            this.out.write(bArr, 0, 4);
            this.out.write(bytes3, 0, bytes3.length);
            this.out.flush();
            try {
                Thread.sleep(2000L);
            } catch (InterruptedException e) {
            }
            if (this.debug) {
                return;
            }
            try {
                this.process.exitValue();
                throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " terminated prematurely." + ((Object) this.msg));
            } catch (IllegalThreadStateException e2) {
            }
        } catch (IOException e3) {
            throw new RuntimeException(str2 + " does not point to a valid python binary.");
        }
    }

    public void close() throws IOException {
        try {
            super.close();
        } catch (Exception e) {
            LOG.error("Exception occurred while closing Streamer. :" + e.getMessage());
        }
        if (!this.debug) {
            destroyProcess();
        }
        if (this.shutdownThread != null) {
            Runtime.getRuntime().removeShutdownHook(this.shutdownThread);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void destroyProcess() throws IOException {
        try {
            this.process.exitValue();
        } catch (IllegalThreadStateException e) {
            if (!this.process.getClass().getName().equals("java.lang.UNIXProcess")) {
                this.process.destroy();
                return;
            }
            try {
                Field declaredField = this.process.getClass().getDeclaredField("pid");
                declaredField.setAccessible(true);
                Runtime.getRuntime().exec(new String[]{"kill", "-9", "" + declaredField.getInt(this.process)});
            } catch (Throwable th) {
                this.process.destroy();
            }
        }
    }
}
