package org.apache.nifi.py4j;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.py4j.logback.LevelChangeListener;
import org.apache.nifi.py4j.logging.StandardLogLevelChangeHandler;
import org.apache.nifi.python.BoundObjectCounts;
import org.apache.nifi.python.ControllerServiceTypeLookup;
import org.apache.nifi.python.PythonBridge;
import org.apache.nifi.python.PythonBridgeInitializationContext;
import org.apache.nifi.python.PythonProcessConfig;
import org.apache.nifi.python.PythonProcessorDetails;
import org.apache.nifi.python.processor.FlowFileSource;
import org.apache.nifi.python.processor.FlowFileSourceProxy;
import org.apache.nifi.python.processor.FlowFileTransform;
import org.apache.nifi.python.processor.FlowFileTransformProxy;
import org.apache.nifi.python.processor.PythonProcessorBridge;
import org.apache.nifi.python.processor.RecordTransform;
import org.apache.nifi.python.processor.RecordTransformProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/py4j/StandardPythonBridge.class */
public class StandardPythonBridge implements PythonBridge {
    private static final Logger logger = LoggerFactory.getLogger(StandardPythonBridge.class);
    private PythonProcessConfig processConfig;
    private ControllerServiceTypeLookup serviceTypeLookup;
    private Supplier<Set<File>> narDirectoryLookup;
    private PythonProcess controllerProcess;
    private volatile boolean running = false;
    private final Map<ExtensionId, Integer> processorCountByType = new ConcurrentHashMap();
    private final Map<ExtensionId, List<PythonProcess>> processesByProcessorType = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/py4j/StandardPythonBridge$ExtensionId.class */
    public static final class ExtensionId extends Record {
        private final String type;
        private final String version;

        private ExtensionId(String str, String str2) {
            this.type = str;
            this.version = str2;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ExtensionId.class), ExtensionId.class, "type;version", "FIELD:Lorg/apache/nifi/py4j/StandardPythonBridge$ExtensionId;->type:Ljava/lang/String;", "FIELD:Lorg/apache/nifi/py4j/StandardPythonBridge$ExtensionId;->version:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ExtensionId.class), ExtensionId.class, "type;version", "FIELD:Lorg/apache/nifi/py4j/StandardPythonBridge$ExtensionId;->type:Ljava/lang/String;", "FIELD:Lorg/apache/nifi/py4j/StandardPythonBridge$ExtensionId;->version:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ExtensionId.class, Object.class), ExtensionId.class, "type;version", "FIELD:Lorg/apache/nifi/py4j/StandardPythonBridge$ExtensionId;->type:Ljava/lang/String;", "FIELD:Lorg/apache/nifi/py4j/StandardPythonBridge$ExtensionId;->version:Ljava/lang/String;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public String type() {
            return this.type;
        }

        public String version() {
            return this.version;
        }
    }

    public void initialize(PythonBridgeInitializationContext pythonBridgeInitializationContext) {
        this.processConfig = pythonBridgeInitializationContext.getPythonProcessConfig();
        this.serviceTypeLookup = pythonBridgeInitializationContext.getControllerServiceTypeLookup();
        this.narDirectoryLookup = pythonBridgeInitializationContext.getNarDirectoryLookup();
    }

    public synchronized void start() throws IOException {
        if (this.running) {
            logger.debug("{} already started, will not start again", this);
            return;
        }
        logger.debug("{} launching Python Process", this);
        try {
            LevelChangeListener.registerLogbackListener(StandardLogLevelChangeHandler.getHandler());
            this.controllerProcess = new PythonProcess(this.processConfig, this.serviceTypeLookup, new File(this.processConfig.getPythonWorkingDirectory(), "controller"), true, "Controller", "Controller");
            this.controllerProcess.start();
            this.running = true;
        } catch (Exception e) {
            shutdown();
            throw e;
        }
    }

    public void discoverExtensions(boolean z) {
        ensureStarted();
        List<String> list = (List) this.processConfig.getPythonExtensionsDirectories().stream().map((v0) -> {
            return v0.getAbsolutePath();
        }).collect(Collectors.toCollection(ArrayList::new));
        if (z) {
            list.addAll(getNarDirectories());
        }
        this.controllerProcess.discoverExtensions(list, this.processConfig.getPythonWorkingDirectory().getAbsolutePath());
    }

    public void discoverExtensions(List<File> list) {
        ensureStarted();
        this.controllerProcess.discoverExtensions(list.stream().map((v0) -> {
            return v0.getAbsolutePath();
        }).toList(), this.processConfig.getPythonWorkingDirectory().getAbsolutePath());
    }

    private PythonProcessorBridge createProcessorBridge(String str, String str2, String str3, boolean z) {
        ensureStarted();
        ExtensionId orElseThrow = findExtensionId(str2, str3).orElseThrow(() -> {
            return new IllegalArgumentException("Processor Type [%s] Version [%s] not found".formatted(str2, str3));
        });
        logger.debug("Creating Python Processor Type [{}] Version [{}]", orElseThrow.type(), orElseThrow.version());
        PythonProcessorDetails orElseThrow2 = getProcessorTypes().stream().filter(pythonProcessorDetails -> {
            return pythonProcessorDetails.getProcessorType().equals(str2);
        }).filter(pythonProcessorDetails2 -> {
            return pythonProcessorDetails2.getProcessorVersion().equals(str3);
        }).findFirst().orElseThrow(() -> {
            return new IllegalArgumentException("Could not find Processor Details for Python Processor type [%s] or version [%s]".formatted(str2, str3));
        });
        PythonProcessorBridge createProcessor = getProcessForNextComponent(orElseThrow, str, orElseThrow2.getExtensionHome(), z, orElseThrow2.isBundledWithDependencies()).createProcessor(str, str2, str3, this.processConfig.getPythonWorkingDirectory().getAbsolutePath(), z);
        this.processorCountByType.merge(orElseThrow, 1, (v0, v1) -> {
            return Integer.sum(v0, v1);
        });
        return createProcessor;
    }

    public AsyncLoadedProcessor createProcessor(String str, String str2, String str3, boolean z, boolean z2) {
        String str4 = getProcessorTypes().stream().filter(pythonProcessorDetails -> {
            return pythonProcessorDetails.getProcessorType().equals(str2);
        }).filter(pythonProcessorDetails2 -> {
            return pythonProcessorDetails2.getProcessorVersion().equals(str3);
        }).findFirst().orElseThrow(() -> {
            return new IllegalArgumentException("Unknown Python Processor type [%s] or version [%s]".formatted(str2, str3));
        }).getInterface();
        Supplier supplier = () -> {
            return createProcessorBridge(str, str2, str3, z);
        };
        if (FlowFileTransform.class.getName().equals(str4)) {
            return new FlowFileTransformProxy(str2, supplier, z2);
        }
        if (RecordTransform.class.getName().equals(str4)) {
            return new RecordTransformProxy(str2, supplier, z2);
        }
        if (FlowFileSource.class.getName().equals(str4)) {
            return new FlowFileSourceProxy(str2, supplier, z2);
        }
        return null;
    }

    public synchronized void onProcessorRemoved(String str, String str2, String str3) {
        Optional<ExtensionId> findExtensionId = findExtensionId(str2, str3);
        if (!findExtensionId.isPresent()) {
            logger.debug("Processor Type [{}] Version [{}] not found", str2, str3);
            return;
        }
        ExtensionId extensionId = findExtensionId.get();
        List<PythonProcess> list = this.processesByProcessorType.get(extensionId);
        if (list == null) {
            return;
        }
        Thread.ofVirtual().name("Remove Python Processor " + str).start(() -> {
            PythonProcess pythonProcess = null;
            try {
                Iterator it = list.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    PythonProcess pythonProcess2 = (PythonProcess) it.next();
                    if (pythonProcess2.removeProcessor(str) && pythonProcess2.getProcessorCount() == 0) {
                        pythonProcess = pythonProcess2;
                        break;
                    }
                }
                if (pythonProcess != null) {
                    list.remove(pythonProcess);
                    pythonProcess.shutdown();
                }
            } catch (Exception e) {
                logger.error("Failed to trigger removal of Python Processor with ID {}", str, e);
            }
        });
        this.processorCountByType.merge(extensionId, -1, (v0, v1) -> {
            return Integer.sum(v0, v1);
        });
    }

    public int getTotalProcessCount() {
        int i = 0;
        Iterator<List<PythonProcess>> it = this.processesByProcessorType.values().iterator();
        while (it.hasNext()) {
            i += it.next().size();
        }
        return i;
    }

    private synchronized PythonProcess getProcessForNextComponent(ExtensionId extensionId, String str, String str2, boolean z, boolean z2) {
        int intValue = this.processorCountByType.getOrDefault(extensionId, 0).intValue();
        int maxPythonProcessesPerType = intValue % this.processConfig.getMaxPythonProcessesPerType();
        List<PythonProcess> computeIfAbsent = this.processesByProcessorType.computeIfAbsent(extensionId, extensionId2 -> {
            return new CopyOnWriteArrayList();
        });
        for (PythonProcess pythonProcess : computeIfAbsent) {
            if (!z || !pythonProcess.containsIsolatedProcessor()) {
                logger.debug("Using {} to create Processor of type {}", pythonProcess, extensionId.type());
                return pythonProcess;
            }
        }
        if (computeIfAbsent.size() > maxPythonProcessesPerType) {
            PythonProcess pythonProcess2 = computeIfAbsent.get(maxPythonProcessesPerType);
            logger.warn("Using existing process {} to create Processor of type {} because configuration indicates that no more than {} processes should be created for any Processor Type. This may result in slower performance for Processors of this type", new Object[]{pythonProcess2, extensionId.type(), Integer.valueOf(this.processConfig.getMaxPythonProcessesPerType())});
            return pythonProcess2;
        }
        try {
            int totalProcessCount = getTotalProcessCount();
            if (totalProcessCount >= this.processConfig.getMaxPythonProcesses()) {
                throw new IllegalStateException("Cannot launch new Python Process because the maximum number of processes allowed, according to nifi.properties, is " + this.processConfig.getMaxPythonProcesses() + " and there are currently " + totalProcessCount + " processes active");
            }
            logger.info("In order to create Python Processor of type {}, launching a new Python Process because there are currently {} Python Processors of this type and {} Python Processes", new Object[]{extensionId.type(), Integer.valueOf(intValue), Integer.valueOf(this.processesByProcessorType.size())});
            PythonProcess pythonProcess3 = new PythonProcess(this.processConfig, this.serviceTypeLookup, z2 ? new File(str2) : new File(new File(new File(this.processConfig.getPythonWorkingDirectory(), "extensions"), extensionId.type()), extensionId.version()), z2, extensionId.type(), str);
            pythonProcess3.start();
            List<String> list = (List) this.processConfig.getPythonExtensionsDirectories().stream().map((v0) -> {
                return v0.getAbsolutePath();
            }).collect(Collectors.toCollection(ArrayList::new));
            list.addAll(getNarDirectories());
            pythonProcess3.discoverExtensions(list, this.processConfig.getPythonWorkingDirectory().getAbsolutePath());
            computeIfAbsent.add(pythonProcess3);
            return pythonProcess3;
        } catch (IOException e) {
            throw new RuntimeException(String.format("Failed to launch Process for Python Processor [%s] Version [%s]", extensionId.type(), extensionId.version()), e);
        }
    }

    public List<PythonProcessorDetails> getProcessorTypes() {
        ensureStarted();
        return this.controllerProcess.getCurrentController().getProcessorTypes();
    }

    public synchronized Map<String, Integer> getProcessCountsPerType() {
        HashMap hashMap = new HashMap(this.processesByProcessorType.size());
        for (Map.Entry<ExtensionId, List<PythonProcess>> entry : this.processesByProcessorType.entrySet()) {
            hashMap.put(entry.getKey().type() + " version " + entry.getKey().version(), Integer.valueOf(entry.getValue().size()));
        }
        return hashMap;
    }

    public void removeProcessorType(String str, String str2) {
        ensureStarted();
        this.controllerProcess.getCurrentController().removeProcessorType(str, str2);
    }

    public synchronized List<BoundObjectCounts> getBoundObjectCounts() {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<ExtensionId, List<PythonProcess>> entry : this.processesByProcessorType.entrySet()) {
            ExtensionId key = entry.getKey();
            for (PythonProcess pythonProcess : entry.getValue()) {
                arrayList.add(new StandardBoundObjectCounts(pythonProcess.toString(), key.type(), key.version(), pythonProcess.getJavaObjectBindingCounts()));
            }
        }
        return arrayList;
    }

    private void ensureStarted() {
        if (!this.running) {
            throw new IllegalStateException("Cannot perform action because " + String.valueOf(this) + " is not currently running");
        }
    }

    public synchronized void shutdown() {
        logger.info("Shutting down Python Server");
        this.running = false;
        Iterator<List<PythonProcess>> it = this.processesByProcessorType.values().iterator();
        while (it.hasNext()) {
            Iterator<PythonProcess> it2 = it.next().iterator();
            while (it2.hasNext()) {
                it2.next().shutdown();
            }
        }
        if (this.controllerProcess != null) {
            this.controllerProcess.shutdown();
        }
        logger.info("Successfully shutdown Python Server");
    }

    public void ping() {
        this.controllerProcess.getCurrentController().ping();
    }

    public String toString() {
        return "StandardPythonBridge";
    }

    private Set<String> getNarDirectories() {
        return (Set) this.narDirectoryLookup.get().stream().map((v0) -> {
            return v0.getAbsolutePath();
        }).collect(Collectors.toSet());
    }

    private Optional<ExtensionId> findExtensionId(String str, String str2) {
        return this.controllerProcess.getCurrentController().getProcessorTypes().stream().filter(pythonProcessorDetails -> {
            return pythonProcessorDetails.getProcessorType().equals(str);
        }).filter(pythonProcessorDetails2 -> {
            return pythonProcessorDetails2.getProcessorVersion().equals(str2);
        }).map(pythonProcessorDetails3 -> {
            return new ExtensionId(pythonProcessorDetails3.getProcessorType(), pythonProcessorDetails3.getProcessorVersion());
        }).findFirst();
    }
}
