package org.apache.nifi.py4j;

import java.io.File;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.python.PythonController;
import org.apache.nifi.python.processor.PythonProcessorAdapter;
import org.apache.nifi.python.processor.PythonProcessorBridge;
import org.apache.nifi.python.processor.PythonProcessorInitializationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/py4j/StandardPythonProcessorBridge.class */
public class StandardPythonProcessorBridge implements PythonProcessorBridge {
    private static final Logger logger = LoggerFactory.getLogger(StandardPythonProcessorBridge.class);
    private final ProcessorCreationWorkflow creationWorkflow;
    private final String processorType;
    private final String processorVersion;
    private volatile PythonProcessorAdapter adapter;
    private final File workingDir;
    private final File moduleFile;
    private volatile long lastModified;
    private volatile AsyncLoadedProcessor.LoadState loadState = AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES;
    private volatile PythonProcessorInitializationContext initializationContext;
    private volatile String identifier;
    private volatile PythonController controller;
    private volatile CompletableFuture<Void> initializationFuture;

    /* loaded from: input_file:org/apache/nifi/py4j/StandardPythonProcessorBridge$Builder.class */
    public static class Builder {
        private PythonController controller;
        private ProcessorCreationWorkflow creationWorkflow;
        private File workDir;
        private File moduleFile;
        private String processorType;
        private String processorVersion;

        public Builder controller(PythonController pythonController) {
            this.controller = pythonController;
            return this;
        }

        public Builder creationWorkflow(ProcessorCreationWorkflow processorCreationWorkflow) {
            this.creationWorkflow = processorCreationWorkflow;
            return this;
        }

        public Builder processorType(String str) {
            this.processorType = str;
            return this;
        }

        public Builder processorVersion(String str) {
            this.processorVersion = str;
            return this;
        }

        public Builder workingDirectory(File file) {
            this.workDir = file;
            return this;
        }

        public Builder moduleFile(File file) {
            this.moduleFile = file;
            return this;
        }

        public StandardPythonProcessorBridge build() {
            if (this.controller == null) {
                throw new IllegalStateException("Must specify the PythonController");
            }
            if (this.creationWorkflow == null) {
                throw new IllegalStateException("Must specify the Processor Creation Workflow");
            }
            if (this.processorType == null) {
                throw new IllegalStateException("Must specify the Processor Type");
            }
            if (this.processorVersion == null) {
                throw new IllegalStateException("Must specify the Processor Version");
            }
            if (this.workDir == null) {
                throw new IllegalStateException("Must specify the Working Directory");
            }
            if (this.moduleFile == null) {
                throw new IllegalStateException("Must specify the Module File");
            }
            return new StandardPythonProcessorBridge(this);
        }
    }

    private StandardPythonProcessorBridge(Builder builder) {
        this.controller = builder.controller;
        this.creationWorkflow = builder.creationWorkflow;
        this.processorType = builder.processorType;
        this.processorVersion = builder.processorVersion;
        this.workingDir = builder.workDir;
        this.moduleFile = builder.moduleFile;
        this.lastModified = this.moduleFile.lastModified();
    }

    public Optional<PythonProcessorAdapter> getProcessorAdapter() {
        return Optional.ofNullable(this.adapter);
    }

    public void initialize(PythonProcessorInitializationContext pythonProcessorInitializationContext) {
        if (this.initializationFuture != null) {
            this.initializationFuture.cancel(true);
        }
        this.initializationContext = pythonProcessorInitializationContext;
        this.identifier = pythonProcessorInitializationContext.getIdentifier();
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.initializationFuture = completableFuture;
        Thread.ofVirtual().name("Initialize Python Processor %s (%s)".formatted(this.identifier, getProcessorType())).start(() -> {
            initializePythonSide(true, completableFuture);
        });
    }

    public void replaceController(PythonController pythonController) {
        if (this.initializationFuture != null) {
            this.initializationFuture.cancel(true);
        }
        this.controller = pythonController;
        this.adapter = null;
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.initializationFuture = completableFuture;
        Thread.ofVirtual().name("Re-Initialize Python Processor %s (%s)".formatted(this.identifier, getProcessorType())).start(() -> {
            initializePythonSide(true, completableFuture);
        });
    }

    public AsyncLoadedProcessor.LoadState getLoadState() {
        return this.loadState;
    }

    private void initializePythonSide(boolean z, CompletableFuture<Void> completableFuture) {
        if (this.initializationContext == null) {
            completableFuture.complete(null);
            return;
        }
        long j = 1000;
        while (true) {
            if (completableFuture.isCancelled()) {
                break;
            }
            if (this.creationWorkflow.isPackagedWithDependencies()) {
                this.loadState = AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE;
                break;
            }
            this.loadState = AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES;
            try {
                this.creationWorkflow.downloadDependencies();
                logger.info("Successfully downloaded dependencies for Python Processor {} ({})", this.identifier, getProcessorType());
                break;
            } catch (Exception e) {
                this.loadState = AsyncLoadedProcessor.LoadState.DEPENDENCY_DOWNLOAD_FAILED;
                if (!z) {
                    throw e;
                }
                j = Math.min(j * 2, TimeUnit.MINUTES.toMillis(10L));
                logger.error("Failed to download dependencies for Python Processor {} ({}). Will try again in {} millis", new Object[]{this.identifier, getProcessorType(), Long.valueOf(j), e});
                try {
                    Thread.sleep(j);
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    e.addSuppressed(e2);
                    throw e;
                }
            }
        }
        while (!completableFuture.isCancelled()) {
            this.loadState = AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE;
            try {
                PythonProcessorAdapter createProcessor = this.creationWorkflow.createProcessor();
                createProcessor.initialize(this.initializationContext);
                this.adapter = createProcessor;
                this.loadState = AsyncLoadedProcessor.LoadState.FINISHED_LOADING;
                logger.info("Successfully loaded Python Processor {} ({})", this.identifier, getProcessorType());
                break;
            } catch (Exception e3) {
                this.loadState = AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE_FAILED;
                if (!z) {
                    throw e3;
                }
                j = Math.min(j * 2, TimeUnit.MINUTES.toMillis(10L));
                logger.error("Failed to load code for Python Processor {} ({}). Will try again in {} millis", new Object[]{this.identifier, getProcessorType(), Long.valueOf(j), e3});
                try {
                    Thread.sleep(j);
                } catch (InterruptedException e4) {
                    Thread.currentThread().interrupt();
                    e3.addSuppressed(e4);
                    throw e3;
                }
            }
        }
        completableFuture.complete(null);
    }

    public String getProcessorType() {
        return this.processorType;
    }

    public boolean reload() {
        if (this.moduleFile.lastModified() <= this.lastModified) {
            logger.debug("Processor {} has not been modified since it was last loaded so will not reload", getProcessorType());
            return false;
        }
        this.controller.reloadProcessor(getProcessorType(), this.processorVersion, this.workingDir.getAbsolutePath());
        initializePythonSide(false, new CompletableFuture<>());
        this.lastModified = this.moduleFile.lastModified();
        return true;
    }
}
