package org.apache.nifi.py4j;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import org.apache.nifi.py4j.client.JavaObjectBindings;
import org.apache.nifi.py4j.client.NiFiPythonGateway;
import org.apache.nifi.py4j.client.StandardPythonClient;
import org.apache.nifi.py4j.server.NiFiGatewayServer;
import org.apache.nifi.python.ControllerServiceTypeLookup;
import org.apache.nifi.python.PythonController;
import org.apache.nifi.python.PythonProcessConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.CallbackClient;
import py4j.GatewayServer;

/* loaded from: input_file:org/apache/nifi/py4j/PythonProcess.class */
public class PythonProcess {
    private static final Logger logger = LoggerFactory.getLogger(PythonProcess.class);
    private static final String PYTHON_CONTROLLER_FILENAME = "Controller.py";
    private final PythonProcessConfig processConfig;
    private final ControllerServiceTypeLookup controllerServiceTypeLookup;
    private final File virtualEnvHome;
    private final String componentType;
    private final String componentId;
    private GatewayServer server;
    private PythonController controller;
    private Process process;
    private NiFiPythonGateway gateway;
    private final Map<String, Boolean> processorPrefersIsolation = new ConcurrentHashMap();

    public PythonProcess(PythonProcessConfig pythonProcessConfig, ControllerServiceTypeLookup controllerServiceTypeLookup, File file, String str, String str2) {
        this.processConfig = pythonProcessConfig;
        this.controllerServiceTypeLookup = controllerServiceTypeLookup;
        this.virtualEnvHome = file;
        this.componentType = str;
        this.componentId = str2;
    }

    public PythonController getController() {
        return this.controller;
    }

    public void start() throws IOException {
        String ping;
        ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault();
        SocketFactory socketFactory = SocketFactory.getDefault();
        int millis = (int) this.processConfig.getCommsTimeout().toMillis();
        String generateAuthToken = generateAuthToken();
        this.gateway = new NiFiPythonGateway(new JavaObjectBindings(), null, new CallbackClient(25334, GatewayServer.defaultAddress(), generateAuthToken, 50000L, TimeUnit.MILLISECONDS, socketFactory, false, millis));
        this.gateway.startup();
        this.server = new NiFiGatewayServer(this.gateway, 0, GatewayServer.defaultAddress(), millis, millis, Collections.emptyList(), serverSocketFactory, generateAuthToken, this.componentType, this.componentId);
        this.server.start();
        int listeningPort = this.server.getListeningPort();
        setupEnvironment();
        this.process = launchPythonProcess(listeningPort, generateAuthToken);
        this.controller = new StandardPythonClient(this.gateway).getController();
        long currentTimeMillis = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60L);
        Exception exc = null;
        boolean z = false;
        while (System.currentTimeMillis() < currentTimeMillis) {
            try {
                ping = this.controller.ping();
                z = "pong".equals(ping);
            } catch (Exception e) {
                exc = e;
                logger.debug("Failed to start Py4J Server", e);
            }
            if (z) {
                break;
            }
            logger.debug("Got unexpected response from Py4J Server during ping: {}", ping);
            try {
                Thread.sleep(50L);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                return;
            }
        }
        if (!z && exc != null) {
            throw new RuntimeException("Failed to start Python Bridge", exc);
        }
        this.controller.setControllerServiceTypeLookup(this.controllerServiceTypeLookup);
        logger.info("Successfully started and pinged Python Server. Python Process = {}", this.process);
    }

    private String generateAuthToken() {
        byte[] bArr = new byte[20];
        new SecureRandom().nextBytes(bArr);
        return Base64.getEncoder().encodeToString(bArr);
    }

    private Process launchPythonProcess(int i, String str) throws IOException {
        File pythonFrameworkDirectory = this.processConfig.getPythonFrameworkDirectory();
        File file = new File(pythonFrameworkDirectory.getParentFile(), "api");
        File pythonLogsDirectory = this.processConfig.getPythonLogsDirectory();
        File file2 = new File(this.virtualEnvHome, "bin/" + new File(this.processConfig.getPythonCommand()).getName());
        String absolutePath = file2.getAbsolutePath();
        File file3 = new File(pythonFrameworkDirectory, PYTHON_CONTROLLER_FILENAME);
        ProcessBuilder processBuilder = new ProcessBuilder(new String[0]);
        ArrayList arrayList = new ArrayList();
        arrayList.add(absolutePath);
        String absolutePath2 = file.getAbsolutePath();
        if (this.processConfig.isDebugController() && "Controller".equals(this.componentId)) {
            arrayList.add("-m");
            arrayList.add("debugpy");
            arrayList.add("--listen");
            arrayList.add(this.processConfig.getDebugHost() + ":" + this.processConfig.getDebugPort());
            arrayList.add("--log-to");
            arrayList.add(this.processConfig.getDebugLogsDirectory().getAbsolutePath());
            absolutePath2 = absolutePath2 + File.pathSeparator + this.virtualEnvHome.getAbsolutePath();
        }
        arrayList.add(file3.getAbsolutePath());
        processBuilder.command(arrayList);
        processBuilder.environment().put("JAVA_PORT", String.valueOf(i));
        processBuilder.environment().put("LOGS_DIR", pythonLogsDirectory.getAbsolutePath());
        processBuilder.environment().put("ENV_HOME", this.virtualEnvHome.getAbsolutePath());
        processBuilder.environment().put("PYTHONPATH", absolutePath2);
        processBuilder.environment().put("PYTHON_CMD", file2.getAbsolutePath());
        processBuilder.environment().put("AUTH_TOKEN", str);
        processBuilder.inheritIO();
        logger.info("Launching Python Process {} {} with working directory {} to communicate with Java on Port {}", new Object[]{absolutePath, file3.getAbsolutePath(), this.virtualEnvHome, Integer.valueOf(i)});
        return processBuilder.start();
    }

    private void setupEnvironment() throws IOException {
        File file = new File(this.virtualEnvHome, "env-creation-complete.txt");
        if (file.exists()) {
            logger.debug("Environment has already been created for {}; will not recreate", this.virtualEnvHome);
            return;
        }
        logger.info("Creating Python Virtual Environment {}", this.virtualEnvHome);
        Files.createDirectories(this.virtualEnvHome.toPath(), new FileAttribute[0]);
        ProcessBuilder processBuilder = new ProcessBuilder(this.processConfig.getPythonCommand(), "-m", "venv", this.virtualEnvHome.getAbsolutePath());
        processBuilder.directory(this.virtualEnvHome.getParentFile());
        logger.debug("Creating Python Virtual Environment {} using command {}", this.virtualEnvHome, String.join(" ", processBuilder.command()));
        try {
            int waitFor = processBuilder.start().waitFor();
            if (waitFor != 0) {
                throw new IOException("Failed to create Python Environment " + String.valueOf(this.virtualEnvHome) + ": process existed with code " + waitFor);
            }
            if (this.processConfig.isDebugController() && "Controller".equals(this.componentId)) {
                installDebugPy();
            }
            file.createNewFile();
            logger.info("Successfully created Python Virtual Environment {}", this.virtualEnvHome);
        } catch (InterruptedException e) {
            throw new IOException("Interrupted while waiting for Python virtual environment to be created");
        }
    }

    private void installDebugPy() throws IOException {
        ProcessBuilder processBuilder = new ProcessBuilder(this.processConfig.getPythonCommand(), "-m", "pip", "install", "--no-cache-dir", "--upgrade", "debugpy", "--target", this.virtualEnvHome.getAbsolutePath());
        processBuilder.directory(this.virtualEnvHome);
        logger.debug("Installing DebugPy to Virtual Env {} using command {}", this.virtualEnvHome, String.join(" ", processBuilder.command()));
        try {
            int waitFor = processBuilder.start().waitFor();
            if (waitFor != 0) {
                throw new IOException("Failed to install DebugPy for Python Environment " + String.valueOf(this.virtualEnvHome) + ": process existed with code " + waitFor);
            }
        } catch (InterruptedException e) {
            throw new IOException("Interrupted while waiting for DebugPy to be installed");
        }
    }

    public void shutdown() {
        logger.info("Shutting down Python Process {}", this.process);
        if (this.server != null) {
            try {
                this.server.shutdown();
            } catch (Exception e) {
                logger.error("Failed to cleanly shutdown Py4J server", e);
            }
        }
        if (this.gateway != null) {
            try {
                this.gateway.shutdown(true);
            } catch (Exception e2) {
                logger.error("Failed to cleanly shutdown Py4J Gateway", e2);
            }
        }
        if (this.process != null) {
            try {
                this.process.destroyForcibly();
            } catch (Exception e3) {
                logger.error("Failed to cleanly shutdown Py4J process", e3);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addProcessor(String str, boolean z) {
        this.processorPrefersIsolation.put(str, Boolean.valueOf(z));
    }

    public boolean containsIsolatedProcessor() {
        return this.processorPrefersIsolation.containsValue(Boolean.TRUE);
    }

    public boolean removeProcessor(String str) {
        return this.processorPrefersIsolation.remove(str) != null;
    }

    public int getProcessorCount() {
        return this.processorPrefersIsolation.size();
    }

    public Map<String, Integer> getJavaObjectBindingCounts() {
        return this.gateway.getObjectBindings().getCountsPerClass();
    }
}
