package org.apache.nifi.py4j;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.py4j.StandardPythonProcessorBridge;
import org.apache.nifi.py4j.client.JavaObjectBindings;
import org.apache.nifi.py4j.client.NiFiPythonGateway;
import org.apache.nifi.py4j.client.StandardPythonClient;
import org.apache.nifi.py4j.logging.LogLevelChangeListener;
import org.apache.nifi.py4j.logging.PythonLogLevel;
import org.apache.nifi.py4j.logging.StandardLogLevelChangeHandler;
import org.apache.nifi.py4j.server.NiFiGatewayServer;
import org.apache.nifi.python.ControllerServiceTypeLookup;
import org.apache.nifi.python.PythonController;
import org.apache.nifi.python.PythonProcessConfig;
import org.apache.nifi.python.PythonProcessorDetails;
import org.apache.nifi.python.processor.PythonProcessorAdapter;
import org.apache.nifi.python.processor.PythonProcessorBridge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.CallbackClient;
import py4j.GatewayServer;

/* loaded from: input_file:org/apache/nifi/py4j/PythonProcess.class */
public class PythonProcess {
    private static final Logger logger = LoggerFactory.getLogger(PythonProcess.class);
    private static final String PYTHON_CONTROLLER_FILENAME = "Controller.py";
    private static final String LOG_READER_THREAD_NAME_FORMAT = "python-log-%d";
    private final PythonProcessConfig processConfig;
    private final ControllerServiceTypeLookup controllerServiceTypeLookup;
    private final File virtualEnvHome;
    private final boolean packagedWithDependencies;
    private final String componentType;
    private final String componentId;
    private GatewayServer server;
    private PythonController controller;
    private Process process;
    private NiFiPythonGateway gateway;
    private final Map<String, Boolean> processorPrefersIsolation = new ConcurrentHashMap();
    private final Set<CreatedProcessor> createdProcessors = new CopyOnWriteArraySet();
    private volatile boolean shutdown = false;
    private volatile List<String> extensionDirs;
    private volatile String workDir;
    private Thread logReaderThread;
    private String logListenerId;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/py4j/PythonProcess$CreatedProcessor.class */
    public static final class CreatedProcessor extends Record {
        private final String identifier;
        private final String type;
        private final PythonProcessorBridge processorBridge;

        private CreatedProcessor(String str, String str2, PythonProcessorBridge pythonProcessorBridge) {
            this.identifier = str;
            this.type = str2;
            this.processorBridge = pythonProcessorBridge;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, CreatedProcessor.class), CreatedProcessor.class, "identifier;type;processorBridge", "FIELD:Lorg/apache/nifi/py4j/PythonProcess$CreatedProcessor;->identifier:Ljava/lang/String;", "FIELD:Lorg/apache/nifi/py4j/PythonProcess$CreatedProcessor;->type:Ljava/lang/String;", "FIELD:Lorg/apache/nifi/py4j/PythonProcess$CreatedProcessor;->processorBridge:Lorg/apache/nifi/python/processor/PythonProcessorBridge;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, CreatedProcessor.class), CreatedProcessor.class, "identifier;type;processorBridge", "FIELD:Lorg/apache/nifi/py4j/PythonProcess$CreatedProcessor;->identifier:Ljava/lang/String;", "FIELD:Lorg/apache/nifi/py4j/PythonProcess$CreatedProcessor;->type:Ljava/lang/String;", "FIELD:Lorg/apache/nifi/py4j/PythonProcess$CreatedProcessor;->processorBridge:Lorg/apache/nifi/python/processor/PythonProcessorBridge;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, CreatedProcessor.class, Object.class), CreatedProcessor.class, "identifier;type;processorBridge", "FIELD:Lorg/apache/nifi/py4j/PythonProcess$CreatedProcessor;->identifier:Ljava/lang/String;", "FIELD:Lorg/apache/nifi/py4j/PythonProcess$CreatedProcessor;->type:Ljava/lang/String;", "FIELD:Lorg/apache/nifi/py4j/PythonProcess$CreatedProcessor;->processorBridge:Lorg/apache/nifi/python/processor/PythonProcessorBridge;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public String identifier() {
            return this.identifier;
        }

        public String type() {
            return this.type;
        }

        public PythonProcessorBridge processorBridge() {
            return this.processorBridge;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/py4j/PythonProcess$PythonProcessLogLevelChangeListener.class */
    public class PythonProcessLogLevelChangeListener implements LogLevelChangeListener {
        private PythonProcessLogLevelChangeListener() {
        }

        @Override // org.apache.nifi.py4j.logging.LogLevelChangeListener
        public void onLevelChange(String str, LogLevel logLevel) {
            PythonProcess.this.controller.setLoggerLevel(str, PythonLogLevel.valueOf(logLevel).getLevel());
        }
    }

    public PythonProcess(PythonProcessConfig pythonProcessConfig, ControllerServiceTypeLookup controllerServiceTypeLookup, File file, boolean z, String str, String str2) {
        this.processConfig = pythonProcessConfig;
        this.controllerServiceTypeLookup = controllerServiceTypeLookup;
        this.virtualEnvHome = file;
        this.packagedWithDependencies = z;
        this.componentType = str;
        this.componentId = str2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PythonController getCurrentController() {
        return this.controller;
    }

    public synchronized void start() throws IOException {
        String ping;
        ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault();
        SocketFactory socketFactory = SocketFactory.getDefault();
        int millis = (int) this.processConfig.getCommsTimeout().toMillis();
        String generateAuthToken = generateAuthToken();
        this.gateway = new NiFiPythonGateway(new JavaObjectBindings(), null, new CallbackClient(25334, GatewayServer.defaultAddress(), generateAuthToken, 50000L, TimeUnit.MILLISECONDS, socketFactory, false, millis));
        this.gateway.startup();
        this.server = new NiFiGatewayServer(this.gateway, 0, GatewayServer.defaultAddress(), millis, millis, Collections.emptyList(), serverSocketFactory, generateAuthToken, this.componentType, this.componentId);
        this.server.start();
        int listeningPort = this.server.getListeningPort();
        setupEnvironment();
        this.process = launchPythonProcess(listeningPort, generateAuthToken);
        this.process.onExit().thenAccept(this::handlePythonProcessDied);
        this.logReaderThread = Thread.ofVirtual().name(LOG_READER_THREAD_NAME_FORMAT.formatted(Long.valueOf(this.process.pid()))).start(new PythonProcessLogReader(this.process.inputReader(StandardCharsets.UTF_8)));
        this.controller = new StandardPythonClient(this.gateway).getController();
        long currentTimeMillis = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60L);
        Exception exc = null;
        boolean z = false;
        while (System.currentTimeMillis() < currentTimeMillis) {
            try {
                ping = this.controller.ping();
                z = "pong".equals(ping);
            } catch (Exception e) {
                exc = e;
                logger.debug("Failed to start Py4J Server", e);
            }
            if (z) {
                break;
            }
            logger.debug("Got unexpected response from Py4J Server during ping: {}", ping);
            try {
                Thread.sleep(50L);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                return;
            }
        }
        if (!z && exc != null) {
            throw new RuntimeException("Failed to start Python Bridge", exc);
        }
        this.logListenerId = Long.toString(this.process.pid());
        StandardLogLevelChangeHandler.getHandler().addListener(this.logListenerId, new PythonProcessLogLevelChangeListener());
        this.controller.setControllerServiceTypeLookup(this.controllerServiceTypeLookup);
        logger.info("Successfully started and pinged Python Server. Python Process = {}", this.process);
    }

    private void handlePythonProcessDied(Process process) {
        if (isShutdown()) {
            logger.info("Python Process {} exited with code {}", process, Integer.valueOf(process.exitValue()));
            return;
        }
        List list = this.createdProcessors.stream().map(createdProcessor -> {
            return "%s (%s)".formatted(createdProcessor.identifier(), createdProcessor.type());
        }).toList();
        logger.error("Python Process {} with Processors {} died unexpectedly with exit code {}. Restarting...", new Object[]{process, list, Integer.valueOf(process.exitValue())});
        long j = 1000;
        while (!isShutdown()) {
            try {
                killProcess();
                start();
                if (this.extensionDirs == null || this.workDir == null) {
                    return;
                }
                discoverExtensions(this.extensionDirs, this.workDir);
                recreateProcessors();
                return;
            } catch (Exception e) {
                logger.error("Failed to restart Python Process with Processors {}; will keep trying", list, e);
                try {
                    Thread.sleep(j);
                    j = Math.min(60000L, j * 2);
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e2);
                }
            }
        }
    }

    private String generateAuthToken() {
        byte[] bArr = new byte[20];
        new SecureRandom().nextBytes(bArr);
        return Base64.getEncoder().encodeToString(bArr);
    }

    private boolean isPackagedWithDependencies() {
        return this.packagedWithDependencies;
    }

    private Process launchPythonProcess(int i, String str) throws IOException {
        File pythonFrameworkDirectory = this.processConfig.getPythonFrameworkDirectory();
        File file = new File(pythonFrameworkDirectory.getParentFile(), "api");
        String resolvePythonCommand = resolvePythonCommand();
        File file2 = new File(pythonFrameworkDirectory, PYTHON_CONTROLLER_FILENAME);
        ProcessBuilder processBuilder = new ProcessBuilder(new String[0]);
        ArrayList arrayList = new ArrayList();
        arrayList.add(resolvePythonCommand);
        if (isPackagedWithDependencies()) {
            arrayList.add("-S");
        }
        String absolutePath = file.getAbsolutePath();
        String absolutePath2 = this.virtualEnvHome.getAbsolutePath();
        String str2 = absolutePath + File.pathSeparator + absolutePath2;
        if (isPackagedWithDependencies()) {
            str2 = str2 + File.pathSeparator + new File(new File(absolutePath2), "NAR-INF/bundled-dependencies").getAbsolutePath();
        }
        if (this.processConfig.isDebugController() && "Controller".equals(this.componentId)) {
            arrayList.add("-m");
            arrayList.add("debugpy");
            arrayList.add("--listen");
            arrayList.add(this.processConfig.getDebugHost() + ":" + this.processConfig.getDebugPort());
            arrayList.add("--log-to-stderr");
            str2 = str2 + File.pathSeparator + this.virtualEnvHome.getAbsolutePath();
        }
        arrayList.add(file2.getAbsolutePath());
        processBuilder.command(arrayList);
        processBuilder.environment().put("JAVA_PORT", String.valueOf(i));
        processBuilder.environment().put("ENV_HOME", this.virtualEnvHome.getAbsolutePath());
        processBuilder.environment().put("PYTHONPATH", str2);
        processBuilder.environment().put("PYTHON_CMD", resolvePythonCommand);
        processBuilder.environment().put("AUTH_TOKEN", str);
        processBuilder.redirectErrorStream(true);
        logger.info("Launching Python Process {} {} with working directory {} to communicate with Java on Port {}", new Object[]{resolvePythonCommand, file2.getAbsolutePath(), this.virtualEnvHome, Integer.valueOf(i)});
        return processBuilder.start();
    }

    String resolvePythonCommand() throws IOException {
        if (isPackagedWithDependencies()) {
            return this.processConfig.getPythonCommand();
        }
        String name = new File(this.processConfig.getPythonCommand()).getName();
        File[] listFiles = this.virtualEnvHome.listFiles((file, str) -> {
            return file.isDirectory() && (str.equals("bin") || str.equals("Scripts"));
        });
        if (listFiles == null || listFiles.length == 0) {
            throw new IOException("Python binary directory could not be found in " + String.valueOf(this.virtualEnvHome));
        }
        return new File(this.virtualEnvHome, (listFiles.length == 1 ? listFiles[0].getName() : findExecutableDirectory(name, listFiles)) + File.separator + name).getAbsolutePath();
    }

    String findExecutableDirectory(String str, File[] fileArr) throws IOException {
        return ((File) List.of((Object[]) fileArr).stream().filter(file -> {
            return ArrayUtils.isNotEmpty(file.list((file, str2) -> {
                return str2.startsWith(str);
            }));
        }).findFirst().orElseThrow(() -> {
            return new IOException("Failed to find Python command [%s]".formatted(str));
        })).getName();
    }

    private void setupEnvironment() throws IOException {
        if (isPackagedWithDependencies()) {
            logger.debug("Will not create Python Virtual Environment because Python Processor packaged with dependencies");
            return;
        }
        File file = new File(this.virtualEnvHome, "env-creation-complete.txt");
        if (file.exists()) {
            logger.debug("Environment has already been created for {}; will not recreate", this.virtualEnvHome);
            return;
        }
        logger.info("Creating Python Virtual Environment {}", this.virtualEnvHome);
        Files.createDirectories(this.virtualEnvHome.toPath(), new FileAttribute[0]);
        ProcessBuilder processBuilder = new ProcessBuilder(this.processConfig.getPythonCommand(), "-m", "venv", this.virtualEnvHome.getAbsolutePath());
        processBuilder.directory(this.virtualEnvHome.getParentFile());
        logger.debug("Creating Python Virtual Environment {} using command {}", this.virtualEnvHome, String.join(" ", processBuilder.command()));
        try {
            int waitFor = processBuilder.start().waitFor();
            if (waitFor != 0) {
                throw new IOException("Failed to create Python Environment " + String.valueOf(this.virtualEnvHome) + ": process existed with code " + waitFor);
            }
            if (this.processConfig.isDebugController() && "Controller".equals(this.componentId)) {
                installDebugPy();
            }
            file.createNewFile();
            logger.info("Successfully created Python Virtual Environment {}", this.virtualEnvHome);
        } catch (InterruptedException e) {
            throw new IOException("Interrupted while waiting for Python virtual environment to be created");
        }
    }

    private void installDebugPy() throws IOException {
        ProcessBuilder processBuilder = new ProcessBuilder(this.processConfig.getPythonCommand(), "-m", "pip", "install", "--no-cache-dir", "--upgrade", "debugpy", "--target", this.virtualEnvHome.getAbsolutePath());
        processBuilder.directory(this.virtualEnvHome);
        logger.debug("Installing DebugPy to Virtual Env {} using command {}", this.virtualEnvHome, String.join(" ", processBuilder.command()));
        try {
            int waitFor = processBuilder.start().waitFor();
            if (waitFor != 0) {
                throw new IOException("Failed to install DebugPy for Python Environment " + String.valueOf(this.virtualEnvHome) + ": process existed with code " + waitFor);
            }
        } catch (InterruptedException e) {
            throw new IOException("Interrupted while waiting for DebugPy to be installed");
        }
    }

    public boolean isShutdown() {
        return this.shutdown;
    }

    public void shutdown() {
        this.shutdown = true;
        logger.info("Shutting down Python Process {}", this.process);
        killProcess();
    }

    private synchronized void killProcess() {
        if (this.logListenerId != null) {
            StandardLogLevelChangeHandler.getHandler().removeListener(this.logListenerId);
        }
        if (this.server != null) {
            try {
                this.server.shutdown();
            } catch (Exception e) {
                logger.error("Failed to cleanly shutdown Py4J server", e);
            }
            this.server = null;
        }
        if (this.gateway != null) {
            try {
                this.gateway.shutdown(true);
            } catch (Exception e2) {
                logger.error("Failed to cleanly shutdown Py4J Gateway", e2);
            }
            this.gateway = null;
        }
        if (this.process != null) {
            try {
                this.process.destroyForcibly();
            } catch (Exception e3) {
                logger.error("Failed to cleanly shutdown Py4J process", e3);
            }
            this.process = null;
        }
        if (this.logReaderThread != null) {
            this.logReaderThread.interrupt();
        }
    }

    public void discoverExtensions(List<String> list, String str) {
        this.extensionDirs = new ArrayList(list);
        this.workDir = str;
        this.controller.discoverExtensions(list, str);
    }

    public PythonProcessorBridge createProcessor(String str, final String str2, final String str3, final String str4) {
        ProcessorCreationWorkflow processorCreationWorkflow = new ProcessorCreationWorkflow() { // from class: org.apache.nifi.py4j.PythonProcess.1
            @Override // org.apache.nifi.py4j.ProcessorCreationWorkflow
            public boolean isPackagedWithDependencies() {
                return PythonProcess.this.packagedWithDependencies;
            }

            @Override // org.apache.nifi.py4j.ProcessorCreationWorkflow
            public void downloadDependencies() {
                if (PythonProcess.this.packagedWithDependencies) {
                    return;
                }
                PythonProcess.this.controller.downloadDependencies(str2, str3, str4);
            }

            @Override // org.apache.nifi.py4j.ProcessorCreationWorkflow
            public PythonProcessorAdapter createProcessor() {
                return PythonProcess.this.controller.createProcessor(str2, str3, str4);
            }
        };
        PythonProcessorDetails processorDetails = this.controller.getProcessorDetails(str2, str3);
        try {
            String processorType = processorDetails.getProcessorType();
            StandardPythonProcessorBridge build = new StandardPythonProcessorBridge.Builder().controller(this.controller).creationWorkflow(processorCreationWorkflow).processorType(processorType).processorVersion(processorDetails.getProcessorVersion()).workingDirectory(this.processConfig.getPythonWorkingDirectory()).moduleFile(new File(this.controller.getModuleFile(str2, str3))).build();
            this.createdProcessors.add(new CreatedProcessor(str, str2, build));
            processorDetails.free();
            return build;
        } catch (Throwable th) {
            processorDetails.free();
            throw th;
        }
    }

    private void recreateProcessors() {
        for (CreatedProcessor createdProcessor : this.createdProcessors) {
            createdProcessor.processorBridge().replaceController(this.controller);
            logger.info("Recreated Processor {} ({}) in Python Process {}", new Object[]{createdProcessor.identifier(), createdProcessor.type(), this.process});
        }
    }

    public boolean containsIsolatedProcessor() {
        return this.processorPrefersIsolation.containsValue(Boolean.TRUE);
    }

    public boolean removeProcessor(String str) {
        return this.processorPrefersIsolation.remove(str) != null;
    }

    public int getProcessorCount() {
        return this.processorPrefersIsolation.size();
    }

    public Map<String, Integer> getJavaObjectBindingCounts() {
        return this.gateway.getObjectBindings().getCountsPerClass();
    }
}
