package org.apache.nifi.processors.stateless;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceReference;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedLabel;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedRemoteGroupPort;
import org.apache.nifi.flow.VersionedRemoteProcessGroup;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.stateless.retrieval.CachingDataflowProvider;
import org.apache.nifi.processors.stateless.retrieval.FileSystemDataflowProvider;
import org.apache.nifi.processors.stateless.retrieval.RegistryDataflowProvider;
import org.apache.nifi.registry.VersionedFlowConverter;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.search.SearchContext;
import org.apache.nifi.search.SearchResult;
import org.apache.nifi.search.Searchable;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stateless.bootstrap.StatelessBootstrap;
import org.apache.nifi.stateless.config.ExtensionClientDefinition;
import org.apache.nifi.stateless.config.ParameterContextDefinition;
import org.apache.nifi.stateless.config.ParameterValueProviderDefinition;
import org.apache.nifi.stateless.config.ReportingTaskDefinition;
import org.apache.nifi.stateless.config.SslContextDefinition;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.DataflowTriggerContext;
import org.apache.nifi.stateless.flow.FailurePortEncounteredException;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TransactionThresholds;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.apache.nifi.stream.io.StreamUtils;

@SupportsSensitiveDynamicProperties
@DynamicProperty(name = "Any Parameter name", value = "Any value", description = "Any dynamic property that is added will be provided to the stateless flow as a Parameter. The name of the property will be the name of the Parameter, and the value of the property will be the value of the Parameter. Because Parameter values may or may not be sensitive, all dynamic properties will be considered sensitive in order to protect their integrity.")
@CapabilityDescription("Runs the configured dataflow using the Stateless NiFi engine. Please see documentation in order to understand the differences between the standard NiFi runtime engine and the Stateless NiFi engine. If the Processor is configured with an incoming connection, the incoming FlowFiles will be queued up into the specified Input Port in the dataflow. Data that is transferred out of the flow via an Output Port will be sent to the 'output' relationship, and an attribute will be added to indicate which Port that FlowFile was transferred to. See Additional Details for more information.")
@Restricted
@SupportsBatching
@WritesAttributes({@WritesAttribute(attribute = "output.port.name", description = "The name of the Output Port that the FlowFile was transferred to"), @WritesAttribute(attribute = "failure.port.name", description = "If one or more FlowFiles is routed to one of the Output Ports that is configured as a Failure Port, the input FlowFile (if any) will have this attribute added to it, indicating the name of the Port that caused the dataflow to be considered a failure.")})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@SystemResourceConsiderations({@SystemResourceConsideration(resource = SystemResource.CPU), @SystemResourceConsideration(resource = SystemResource.DISK), @SystemResourceConsideration(resource = SystemResource.MEMORY), @SystemResourceConsideration(resource = SystemResource.NETWORK)})
/* loaded from: input_file:org/apache/nifi/processors/stateless/ExecuteStateless.class */
public class ExecuteStateless extends AbstractProcessor implements Searchable {
    public static final AllowableValue SPEC_FROM_FILE = new AllowableValue("Use Local File", "Use Local File or URL", "Dataflow to run is stored as a file on the NiFi server or at a URL that is accessible to the NiFi server");
    public static final AllowableValue SPEC_FROM_REGISTRY = new AllowableValue("Use NiFi Registry", "Use NiFi Registry", "Dataflow to run is stored in NiFi Registry");
    public static final AllowableValue CONTENT_STORAGE_HEAP = new AllowableValue("Store Content on Heap", "Store Content on Heap", "The FlowFile content will be stored on the NiFi JVM's heap. This is the most efficient option for small FlowFiles but can quickly exhaust the heap with larger FlowFiles, resulting in Out Of Memory Errors and node instability.");
    public static final AllowableValue CONTENT_STORAGE_DISK = new AllowableValue("Store Content on Disk", "Store Content on Disk", "The FlowFile content will be stored on disk, within the configured Work Directory. The content will still be cleared between invocations and will not be persisted across restarts.");
    public static final PropertyDescriptor DATAFLOW_SPECIFICATION_STRATEGY = new PropertyDescriptor.Builder().name("Dataflow Specification Strategy").displayName("Dataflow Specification Strategy").description("Specifies how the Processor should obtain a copy of the dataflow that it is to run").required(true).allowableValues(new DescribedValue[]{SPEC_FROM_FILE, SPEC_FROM_REGISTRY}).defaultValue(SPEC_FROM_FILE.getValue()).build();
    public static final PropertyDescriptor DATAFLOW_FILE = new PropertyDescriptor.Builder().name("Dataflow File").displayName("Dataflow File/URL").description("The filename or URL that specifies the dataflow that is to be run").required(true).identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, new ResourceType[]{ResourceType.URL}).dependsOn(DATAFLOW_SPECIFICATION_STRATEGY, new AllowableValue[]{SPEC_FROM_FILE}).build();
    public static final PropertyDescriptor REGISTRY_URL = new PropertyDescriptor.Builder().name("Registry URL").displayName("Registry URL").description("The URL of the NiFi Registry to retrieve the flow from").required(true).addValidator(StandardValidators.URL_VALIDATOR).dependsOn(DATAFLOW_SPECIFICATION_STRATEGY, new AllowableValue[]{SPEC_FROM_REGISTRY}).build();
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("Registry SSL Context Service").displayName("Registry SSL Context Service").description("The SSL Context Service to use for interacting with the NiFi Registry").required(false).identifiesControllerService(SSLContextService.class).dependsOn(DATAFLOW_SPECIFICATION_STRATEGY, new AllowableValue[]{SPEC_FROM_REGISTRY}).build();
    public static final PropertyDescriptor COMMS_TIMEOUT = new PropertyDescriptor.Builder().name("Communications Timeout").displayName("Communications Timeout").description("Specifies how long to wait before timing out when attempting to communicate with NiFi Registry").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).dependsOn(DATAFLOW_SPECIFICATION_STRATEGY, new AllowableValue[]{SPEC_FROM_REGISTRY}).defaultValue("15 secs").build();
    public static final PropertyDescriptor BUCKET = new PropertyDescriptor.Builder().name("Registry Bucket").displayName("Registry Bucket").description("The name of the Bucket in the NiFi Registry that the flow should retrieved from").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dependsOn(DATAFLOW_SPECIFICATION_STRATEGY, new AllowableValue[]{SPEC_FROM_REGISTRY}).build();
    public static final PropertyDescriptor FLOW_NAME = new PropertyDescriptor.Builder().name("Flow Name").displayName("Flow Name").description("The name of the flow in the NiFi Registry").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dependsOn(DATAFLOW_SPECIFICATION_STRATEGY, new AllowableValue[]{SPEC_FROM_REGISTRY}).build();
    public static final PropertyDescriptor FLOW_VERSION = new PropertyDescriptor.Builder().name("Flow Version").displayName("Flow Version").description("The version of the flow in the NiFi Registry that should be retrieved. If not specified, the latest version will always be used.").required(false).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).dependsOn(DATAFLOW_SPECIFICATION_STRATEGY, new AllowableValue[]{SPEC_FROM_REGISTRY}).build();
    public static final PropertyDescriptor INPUT_PORT = new PropertyDescriptor.Builder().name("Input Port").displayName("Input Port").description("Specifies the name of the Input Port to send incoming FlowFiles to. This property is required if this processor has any incoming connections.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor FAILURE_PORTS = new PropertyDescriptor.Builder().name("Failure Ports").displayName("Failure Ports").description("A comma-separated list of the names of Output Ports that exist at the root level of the dataflow. If any FlowFile is routed to one of the Ports whose name is listed here, the dataflow will be considered a failure, and the incoming FlowFile (if any) will be routed to 'failure'. If not specified, all Output Ports will be considered successful.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final PropertyDescriptor DATAFLOW_TIMEOUT = new PropertyDescriptor.Builder().name("Dataflow Timeout").displayName("Dataflow Timeout").description("If the flow does not complete within this amount of time, the incoming FlowFile, if any, will be routed to the timeout relationship,the dataflow will be cancelled, and the invocation will end.").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("60 sec").build();
    public static final PropertyDescriptor LIB_DIRECTORY = new PropertyDescriptor.Builder().name("NAR Directory").displayName("NAR Directory").description("This directory has three roles: 1) it contains the NiFi Stateless NAR and other necessary libraries required for the Stateless engine to be bootstrapped, 2) it can contain extensions that should be loaded by the Stateless engine, 3) it is used by the Stateless engine to download extensions into.").required(true).addValidator(StandardValidators.createDirectoryExistsValidator(false, false)).defaultValue("./lib").build();
    public static final PropertyDescriptor ADDITIONAL_LIB_DIRECTORIES = new PropertyDescriptor.Builder().name("additional-nar-directories").displayName("Additional NAR Directories").description("A comma-separated list of paths for directories that contain extensions that should be loaded by the stateless engine. The engine will not download any extensions into these directories or write to them but will read any NAR files that are found within these directories. The engine will not recurse into subdirectories of these directories.").required(false).addValidator(StandardValidators.createListValidator(true, true, StandardValidators.createDirectoryExistsValidator(false, false))).build();
    public static final PropertyDescriptor WORKING_DIRECTORY = new PropertyDescriptor.Builder().name("Work Directory").displayName("Work Directory").description("A directory that can be used to create temporary files, such as expanding NAR files, temporary FlowFile content, caching the dataflow, etc.").required(true).addValidator(StandardValidators.createDirectoryExistsValidator(false, true)).defaultValue("./work").build();
    public static final PropertyDescriptor KRB5_CONF = new PropertyDescriptor.Builder().name("Krb5 Conf File").displayName("Krb5 Conf File").description("The KRB5 Conf file to use for configuring components that rely on Kerberos").required(false).identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, new ResourceType[0]).build();
    public static final PropertyDescriptor STATELESS_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("Stateless SSL Context Service").displayName("Stateless SSL Context Service").description("The SSL Context to use as the Stateless System SSL Context").required(false).identifiesControllerService(SSLContextService.class).build();
    public static final PropertyDescriptor MAX_INGEST_FLOWFILES = new PropertyDescriptor.Builder().name("Max Ingest FlowFiles").displayName("Max Ingest FlowFiles").description("During the course of a stateless dataflow, some processors may require more data than they have available in order to proceed. For example, MergeContent may require a minimum number of FlowFiles before it can proceed. In this case, the dataflow may bring in additional data from its source Processor. However, this data may all be held in memory, so this property provides a mechanism for limiting the maximum number of FlowFiles that the source Processor can ingest before it will no longer be triggered to ingest additional data.").required(false).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final PropertyDescriptor MAX_INGEST_DATA_SIZE = new PropertyDescriptor.Builder().name("Max Ingest Data Size").displayName("Max Ingest Data Size").description("During the course of a stateless dataflow, some processors may require more data than they have available in order to proceed. For example, MergeContent may require a minimum number of FlowFiles before it can proceed. In this case, the dataflow may bring in additional data from its source Processor. However, this data may all be held in memory, so this property provides a mechanism for limiting the maximum amount of data that the source Processor can ingest before it will no longer be triggered to ingest additional data.").required(false).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final PropertyDescriptor CONTENT_STORAGE_STRATEGY = new PropertyDescriptor.Builder().name("Content Storage Strategy").displayName("Content Storage Strategy").description("Specifies where the content of FlowFiles that the Stateless dataflow is operating on should be stored. Note that the data is always considered temporary and may be deleted at any time. It is not intended to be persisted across restarted.").required(true).allowableValues(new DescribedValue[]{CONTENT_STORAGE_HEAP, CONTENT_STORAGE_DISK}).defaultValue(CONTENT_STORAGE_DISK.getValue()).build();
    public static final PropertyDescriptor MAX_INPUT_FLOWFILE_SIZE = new PropertyDescriptor.Builder().name("Max Input FlowFile Size").displayName("Max Input FlowFile Size").description("This Processor is configured to load all incoming FlowFiles into memory. Because of that, it is important to limit the maximum size of any incoming FlowFile that would get loaded into memory, in order to prevent Out Of Memory Errors and excessive Garbage Collection. Any FlowFile whose content size is greater than the configured size will be routed to failure and not sent to the Stateless Engine.").required(true).dependsOn(CONTENT_STORAGE_STRATEGY, new AllowableValue[]{CONTENT_STORAGE_HEAP}).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).defaultValue("1 MB").build();
    public static final PropertyDescriptor STATUS_TASK_INTERVAL = new PropertyDescriptor.Builder().name("Status Task Interval").displayName("Status Task Interval").description("The Stateless engine periodically logs the status of the dataflow's processors.  This property allows the interval to be changed, or the status logging to be skipped altogether if the property is not set.").required(false).addValidator(StandardValidators.createTimePeriodValidator(10, TimeUnit.SECONDS, 24, TimeUnit.HOURS)).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("For any incoming FlowFile that is successfully processed, the original incoming FlowFile will be transferred to this Relationship").autoTerminateDefault(true).build();
    static final Relationship REL_OUTPUT = new Relationship.Builder().name("output").description("Any FlowFiles that are transferred to an Output Port in the configured dataflow will be routed to this Relationship").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If the dataflow fails to process an incoming FlowFile, that FlowFile will be routed to this relationship").build();
    static final Relationship REL_TIMEOUT = new Relationship.Builder().name("timeout").description("If the dataflow fails to complete in the configured amount of time, any incoming FlowFile will be routed to this relationship").build();
    private final BlockingQueue<StatelessDataflow> dataflows = new LinkedBlockingDeque();
    private final AtomicInteger dataflowCreationCount = new AtomicInteger(0);
    private volatile Set<String> failurePortNames;
    private volatile VersionedFlowSnapshot flowSnapshot;
    private volatile AbortableTriggerContext triggerContext;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/stateless/ExecuteStateless$AbortableTriggerContext.class */
    public static class AbortableTriggerContext implements DataflowTriggerContext {
        private volatile boolean aborted = false;

        private AbortableTriggerContext() {
        }

        public boolean isAbort() {
            return this.aborted;
        }

        public void abort() {
            this.aborted = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/stateless/ExecuteStateless$VersionedComponentSearchResults.class */
    public static class VersionedComponentSearchResults {
        private final String term;
        private final List<SearchResult> results = new ArrayList();

        public VersionedComponentSearchResults(String str) {
            this.term = str;
        }

        public void add(String str, String str2) {
            if (str != null && str.contains(this.term)) {
                this.results.add(new SearchResult.Builder().match(str).label(str2).build());
            }
        }

        public List<SearchResult> toList() {
            return this.results;
        }
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return Arrays.asList(DATAFLOW_SPECIFICATION_STRATEGY, DATAFLOW_FILE, REGISTRY_URL, SSL_CONTEXT_SERVICE, COMMS_TIMEOUT, BUCKET, FLOW_NAME, FLOW_VERSION, INPUT_PORT, FAILURE_PORTS, CONTENT_STORAGE_STRATEGY, MAX_INPUT_FLOWFILE_SIZE, DATAFLOW_TIMEOUT, LIB_DIRECTORY, ADDITIONAL_LIB_DIRECTORIES, WORKING_DIRECTORY, MAX_INGEST_FLOWFILES, MAX_INGEST_DATA_SIZE, STATELESS_SSL_CONTEXT_SERVICE, KRB5_CONF, STATUS_TASK_INTERVAL);
    }

    public Set<Relationship> getRelationships() {
        return new HashSet(Arrays.asList(REL_ORIGINAL, REL_OUTPUT, REL_FAILURE, REL_TIMEOUT));
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String str) {
        return new PropertyDescriptor.Builder().name(str).defaultValue("Value for the " + str + " parameter").addValidator(Validator.VALID).dynamic(true).build();
    }

    @OnScheduled
    public void parseDataflow(ProcessContext processContext) throws IOException {
        CachingDataflowProvider cachingDataflowProvider = new CachingDataflowProvider(getIdentifier(), getLogger(), processContext.getProperty(DATAFLOW_SPECIFICATION_STRATEGY).getValue().equalsIgnoreCase(SPEC_FROM_FILE.getValue()) ? new FileSystemDataflowProvider() : new RegistryDataflowProvider(getLogger()));
        long nanoTime = System.nanoTime();
        VersionedFlowSnapshot retrieveDataflowContents = cachingDataflowProvider.retrieveDataflowContents(processContext);
        getLogger().info("Successfully retrieved flow in {} millis", new Object[]{Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime))});
        this.flowSnapshot = retrieveDataflowContents;
        this.triggerContext = new AbortableTriggerContext();
        HashSet hashSet = new HashSet();
        String value = processContext.getProperty(FAILURE_PORTS).getValue();
        if (value != null) {
            for (String str : value.split(",")) {
                hashSet.add(str.trim());
            }
        }
        this.failurePortNames = hashSet;
    }

    @OnUnscheduled
    public void abortDataflow() {
        if (this.triggerContext != null) {
            this.triggerContext.abort();
        }
    }

    @OnStopped
    public void shutdown() {
        while (true) {
            StatelessDataflow poll = this.dataflows.poll();
            if (poll == null) {
                this.dataflows.clear();
                this.dataflowCreationCount.set(0);
                return;
            }
            poll.shutdown();
        }
    }

    private StatelessDataflow createDataflow(ProcessContext processContext) throws IOException, StatelessConfigurationException {
        StatelessDataflow createDataflow = StatelessBootstrap.bootstrap(createEngineConfiguration(processContext, this.dataflowCreationCount.getAndIncrement()), Thread.currentThread().getContextClassLoader()).createDataflow(createDataflowDefinition(processContext, this.flowSnapshot));
        createDataflow.initialize();
        return createDataflow;
    }

    private StatelessDataflow getDataflow(ProcessContext processContext) throws IOException, StatelessConfigurationException {
        StatelessDataflow poll = this.dataflows.poll();
        return poll == null ? createDataflow(processContext) : poll;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = null;
        if (processContext.hasIncomingConnection()) {
            flowFile = processSession.get();
            if (flowFile == null) {
                return;
            }
        }
        try {
            StatelessDataflow dataflow = getDataflow(processContext);
            try {
                runDataflow(dataflow, flowFile, processContext, processSession);
                this.dataflows.offer(dataflow);
            } catch (Throwable th) {
                this.dataflows.offer(dataflow);
                throw th;
            }
        } catch (Exception e) {
            getLogger().error("Could not create dataflow from snapshot", e);
            processSession.rollback();
        }
    }

    private void runDataflow(StatelessDataflow statelessDataflow, FlowFile flowFile, ProcessContext processContext, ProcessSession processSession) {
        Optional empty;
        try {
            long longValue = processContext.getProperty(DATAFLOW_TIMEOUT).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
            if (flowFile == null || enqueueFlowFile(flowFile, statelessDataflow, processContext, processSession)) {
                statelessDataflow.resetCounters();
                BulletinRepository bulletinRepository = statelessDataflow.getBulletinRepository();
                long maxBulletinId = bulletinRepository.getMaxBulletinId();
                DataflowTrigger trigger = statelessDataflow.trigger(this.triggerContext);
                boolean z = false;
                try {
                    try {
                        empty = trigger.getResult(longValue, TimeUnit.MILLISECONDS);
                        surfaceBulletins(bulletinRepository, maxBulletinId);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        z = true;
                        empty = Optional.empty();
                        trigger.cancel();
                        surfaceBulletins(bulletinRepository, maxBulletinId);
                    }
                    if (!empty.isPresent()) {
                        z = true;
                    }
                    if (z) {
                        getLogger().error("Dataflow did not complete within the allotted time of {} milliseconds for {}. Routing to timeout.", new Object[]{Long.valueOf(longValue), flowFile});
                        if (flowFile != null) {
                            processSession.transfer(flowFile, REL_TIMEOUT);
                        }
                        trigger.cancel();
                        return;
                    }
                    TriggerResult triggerResult = (TriggerResult) empty.get();
                    if (!triggerResult.isSuccessful()) {
                        Optional failureCause = triggerResult.getFailureCause();
                        if (failureCause.isPresent()) {
                            FailurePortEncounteredException failurePortEncounteredException = (Throwable) failureCause.get();
                            if (flowFile == null) {
                                getLogger().error("Dataflow failed to complete successfully. Yielding.", (Throwable) failureCause.get());
                            } else {
                                getLogger().error("Dataflow failed to complete successfully for {}. Routing to failure and yielding.", new Object[]{flowFile, failureCause.get()});
                                if (failurePortEncounteredException instanceof FailurePortEncounteredException) {
                                    processSession.putAttribute(flowFile, "failure.port.name", failurePortEncounteredException.getPortName());
                                }
                            }
                        }
                        if (flowFile != null) {
                            processSession.transfer(flowFile, REL_FAILURE);
                        }
                        adjustCounters(processSession, statelessDataflow, " (Failed attempts)");
                        processSession.adjustCounter("Failed Invocations", 1L, false);
                        processContext.yield();
                        return;
                    }
                    try {
                        Set<FlowFile> createOutputFlowFiles = createOutputFlowFiles((TriggerResult) empty.get(), processSession, flowFile);
                        adjustCounters(processSession, statelessDataflow, null);
                        long sourceYieldExpiration = statelessDataflow.getSourceYieldExpiration();
                        if (sourceYieldExpiration > 0 && sourceYieldExpiration - System.currentTimeMillis() > 0) {
                            processContext.yield();
                        }
                        if (flowFile != null) {
                            processSession.transfer(flowFile, REL_ORIGINAL);
                        }
                        processSession.transfer(createOutputFlowFiles, REL_OUTPUT);
                        Objects.requireNonNull(triggerResult);
                        processSession.commitAsync(triggerResult::acknowledge);
                        if (flowFile == null) {
                            getLogger().info("Successfully triggered dataflow to run, producing {} output FlowFiles", new Object[]{Integer.valueOf(createOutputFlowFiles.size())});
                        } else {
                            getLogger().info("Successfully triggered dataflow to run against {}, producing {} output FlowFiles", new Object[]{flowFile, Integer.valueOf(createOutputFlowFiles.size())});
                        }
                        processSession.adjustCounter("Successful Invocations", 1L, false);
                    } catch (IOException e2) {
                        getLogger().error("Failed to write FlowFile contents that were output from Stateless Flow to the NiFi content repository for {}. Routing to failure.", new Object[]{flowFile, e2});
                        if (flowFile != null) {
                            processSession.transfer(flowFile, REL_FAILURE);
                        }
                    }
                } catch (Throwable th) {
                    surfaceBulletins(bulletinRepository, maxBulletinId);
                    throw th;
                }
            }
        } catch (Exception e3) {
            getLogger().error("Failed to determine Dataflow Timeout for {}. Routing to failure", new Object[]{flowFile});
            processSession.transfer(flowFile, REL_FAILURE);
        }
    }

    private void surfaceBulletins(BulletinRepository bulletinRepository, long j) {
        for (Bulletin bulletin : bulletinRepository.findBulletins(new BulletinQuery.Builder().after(Long.valueOf(j)).build())) {
            try {
                String level = bulletin.getLevel();
                if (level == null || level.equalsIgnoreCase("WARNING")) {
                    level = "WARN";
                }
                LogLevel valueOf = LogLevel.valueOf(level);
                if (valueOf != LogLevel.DEBUG && valueOf != LogLevel.INFO) {
                    getLogger().log(valueOf, "{} {}[name={}, id={}] {}", new Object[]{bulletin.getTimestamp(), bulletin.getSourceType(), bulletin.getSourceName(), bulletin.getSourceName(), bulletin.getMessage()});
                }
            } catch (Exception e) {
                getLogger().warn("Dataflow emitted a bulletin but failed to surface that bulletin due to {}", new Object[]{e.toString(), e});
            }
        }
    }

    private void adjustCounters(ProcessSession processSession, StatelessDataflow statelessDataflow, String str) {
        for (Map.Entry entry : statelessDataflow.getCounters(false).entrySet()) {
            if (((Long) entry.getValue()).longValue() != 0) {
                processSession.adjustCounter(str == null ? (String) entry.getKey() : ((String) entry.getKey()) + str, ((Long) entry.getValue()).longValue(), false);
            }
        }
    }

    private Set<FlowFile> createOutputFlowFiles(TriggerResult triggerResult, ProcessSession processSession, FlowFile flowFile) throws IOException {
        HashSet hashSet = new HashSet();
        try {
            for (Map.Entry entry : triggerResult.getOutputFlowFiles().entrySet()) {
                String str = (String) entry.getKey();
                for (FlowFile flowFile2 : (List) entry.getValue()) {
                    FlowFile create = flowFile == null ? processSession.create() : processSession.create(flowFile);
                    hashSet.add(create);
                    OutputStream write = processSession.write(create);
                    try {
                        InputStream readContent = triggerResult.readContent(flowFile2);
                        try {
                            StreamUtils.copy(readContent, write);
                            if (readContent != null) {
                                readContent.close();
                            }
                            if (write != null) {
                                write.close();
                            }
                            HashMap hashMap = new HashMap(flowFile2.getAttributes());
                            hashMap.put("output.port.name", str);
                            processSession.putAllAttributes(create, hashMap);
                        } finally {
                        }
                    } finally {
                    }
                }
            }
            return hashSet;
        } catch (Exception e) {
            processSession.remove(hashSet);
            throw e;
        }
    }

    private boolean enqueueFlowFile(FlowFile flowFile, StatelessDataflow statelessDataflow, ProcessContext processContext, ProcessSession processSession) {
        long longValue = processContext.getProperty(MAX_INPUT_FLOWFILE_SIZE).asDataSize(DataUnit.B).longValue();
        if (flowFile.getSize() > longValue) {
            getLogger().warn("Will not process {} because its size of {} bytes exceeds the max configured threshold of {} bytes. Routing to failure", new Object[]{flowFile, Long.valueOf(flowFile.getSize()), Long.valueOf(longValue)});
            processSession.transfer(flowFile, REL_FAILURE);
            return false;
        }
        Set inputPortNames = statelessDataflow.getInputPortNames();
        String value = processContext.getProperty(INPUT_PORT).evaluateAttributeExpressions(flowFile).getValue();
        if (value == null || value.trim().isEmpty()) {
            if (inputPortNames.size() != 1) {
                getLogger().error("For {}, determined Input Port Name to be unspecified. Routing to failure.", new Object[]{flowFile});
                processSession.transfer(flowFile, REL_FAILURE);
                return false;
            }
            value = (String) inputPortNames.iterator().next();
        }
        if (!inputPortNames.contains(value)) {
            getLogger().error("For {}, Input Port Name is {}, but that Input Port does not exist in the provided dataflow or is not at the root level. Routing to failure", new Object[]{flowFile, value});
            processSession.transfer(flowFile, REL_FAILURE);
            return false;
        }
        try {
            InputStream read = processSession.read(flowFile);
            try {
                statelessDataflow.enqueue(read, flowFile.getAttributes(), value);
                if (read != null) {
                    read.close();
                }
                return true;
            } finally {
            }
        } catch (IOException e) {
            getLogger().error("Failed to read contents of FlowFile {} into memory. Routing to failure", new Object[]{flowFile});
            processSession.transfer(flowFile, REL_FAILURE);
            return false;
        }
    }

    private DataflowDefinition createDataflowDefinition(ProcessContext processContext, final VersionedFlowSnapshot versionedFlowSnapshot) {
        final VersionedExternalFlow createVersionedExternalFlow = VersionedFlowConverter.createVersionedExternalFlow(versionedFlowSnapshot);
        final ParameterValueProviderDefinition parameterValueProviderDefinition = new ParameterValueProviderDefinition();
        parameterValueProviderDefinition.setType("org.apache.nifi.stateless.parameter.OverrideParameterValueProvider");
        parameterValueProviderDefinition.setName("Parameter Override");
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : processContext.getProperties().entrySet()) {
            if (((PropertyDescriptor) entry.getKey()).isDynamic()) {
                hashMap.put(((PropertyDescriptor) entry.getKey()).getName(), (String) entry.getValue());
            }
        }
        parameterValueProviderDefinition.setPropertyValues(hashMap);
        final Integer asInteger = processContext.getProperty(MAX_INGEST_FLOWFILES).asInteger();
        final Double asDataSize = processContext.getProperty(MAX_INGEST_DATA_SIZE).asDataSize(DataUnit.B);
        final long longValue = processContext.getProperty(DATAFLOW_TIMEOUT).asTimePeriod(TimeUnit.NANOSECONDS).longValue();
        final TransactionThresholds transactionThresholds = new TransactionThresholds(this) { // from class: org.apache.nifi.processors.stateless.ExecuteStateless.1
            public OptionalLong getMaxFlowFiles() {
                return asInteger == null ? OptionalLong.empty() : OptionalLong.of(asInteger.intValue());
            }

            public OptionalLong getMaxContentSize(DataUnit dataUnit) {
                return asDataSize == null ? OptionalLong.empty() : OptionalLong.of(asDataSize.longValue());
            }

            public OptionalLong getMaxTime(TimeUnit timeUnit) {
                return OptionalLong.of(timeUnit.convert(longValue, TimeUnit.NANOSECONDS));
            }
        };
        return new DataflowDefinition() { // from class: org.apache.nifi.processors.stateless.ExecuteStateless.2
            public VersionedExternalFlow getVersionedExternalFlow() {
                return createVersionedExternalFlow;
            }

            public String getFlowName() {
                return versionedFlowSnapshot.getFlowContents().getName();
            }

            public Set<String> getFailurePortNames() {
                return ExecuteStateless.this.failurePortNames;
            }

            public Set<String> getInputPortNames() {
                return Collections.emptySet();
            }

            public Set<String> getOutputPortNames() {
                return ExecuteStateless.this.failurePortNames;
            }

            public List<ParameterContextDefinition> getParameterContexts() {
                return null;
            }

            public List<ReportingTaskDefinition> getReportingTaskDefinitions() {
                return Collections.emptyList();
            }

            public List<ParameterValueProviderDefinition> getParameterValueProviderDefinitions() {
                return Collections.singletonList(parameterValueProviderDefinition);
            }

            public TransactionThresholds getTransactionThresholds() {
                return transactionThresholds;
            }
        };
    }

    private StatelessEngineConfiguration createEngineConfiguration(ProcessContext processContext, int i) {
        SslContextDefinition sslContextDefinition;
        final File file = new File(processContext.getProperty(WORKING_DIRECTORY).getValue());
        final File file2 = new File(processContext.getProperty(LIB_DIRECTORY).getValue());
        final Collection<File> additionalNarDirectories = getAdditionalNarDirectories(processContext.getProperty(ADDITIONAL_LIB_DIRECTORIES).getValue());
        ResourceReference asResource = processContext.getProperty(KRB5_CONF).asResource();
        final File asFile = asResource == null ? null : asResource.asFile();
        SSLContextService asControllerService = processContext.getProperty(STATELESS_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (asControllerService == null) {
            sslContextDefinition = null;
        } else {
            sslContextDefinition = new SslContextDefinition();
            sslContextDefinition.setKeyPass(asControllerService.getKeyPassword());
            sslContextDefinition.setKeystoreFile(asControllerService.getKeyStoreFile());
            sslContextDefinition.setKeystorePass(asControllerService.getKeyStorePassword());
            sslContextDefinition.setKeystoreType(asControllerService.getKeyStoreType());
            sslContextDefinition.setTruststoreFile(asControllerService.getTrustStoreFile());
            sslContextDefinition.setTruststorePass(asControllerService.getTrustStorePassword());
            sslContextDefinition.setTruststoreType(asControllerService.getTrustStoreType());
        }
        File file3 = CONTENT_STORAGE_DISK.getValue().equals(processContext.getProperty(CONTENT_STORAGE_STRATEGY).getValue()) ? new File(new File(new File(file, "execute-stateless-flowfile-content"), getIdentifier()), String.valueOf(i)) : null;
        final String value = processContext.getProperty(STATUS_TASK_INTERVAL).getValue();
        final File file4 = file3;
        final SslContextDefinition sslContextDefinition2 = sslContextDefinition;
        return new StatelessEngineConfiguration() { // from class: org.apache.nifi.processors.stateless.ExecuteStateless.3
            public File getWorkingDirectory() {
                return file;
            }

            public File getNarDirectory() {
                return file2;
            }

            public File getExtensionsDirectory() {
                return file2;
            }

            public Collection<File> getReadOnlyExtensionsDirectories() {
                return additionalNarDirectories;
            }

            public File getKrb5File() {
                return asFile;
            }

            public Optional<File> getContentRepositoryDirectory() {
                return Optional.ofNullable(file4);
            }

            public SslContextDefinition getSslContext() {
                return sslContextDefinition2;
            }

            public String getSensitivePropsKey() {
                return ExecuteStateless.this.getIdentifier();
            }

            public List<ExtensionClientDefinition> getExtensionClients() {
                return Collections.emptyList();
            }

            public boolean isLogExtensionDiscovery() {
                return false;
            }

            public String getStatusTaskInterval() {
                return value;
            }
        };
    }

    public Collection<SearchResult> search(SearchContext searchContext) {
        if (this.flowSnapshot == null) {
            return Collections.emptyList();
        }
        VersionedComponentSearchResults versionedComponentSearchResults = new VersionedComponentSearchResults(searchContext.getSearchTerm());
        Bucket bucket = this.flowSnapshot.getBucket();
        if (bucket != null) {
            versionedComponentSearchResults.add(bucket.getIdentifier(), "Bucket ID");
            versionedComponentSearchResults.add(bucket.getName(), "Bucket Name");
            versionedComponentSearchResults.add(bucket.getDescription(), "Bucket Description");
        }
        VersionedFlow flow = this.flowSnapshot.getFlow();
        if (flow != null) {
            versionedComponentSearchResults.add(flow.getIdentifier(), "Flow ID");
            versionedComponentSearchResults.add(flow.getName(), "Flow Name");
            versionedComponentSearchResults.add(flow.getDescription(), "Flow Description");
        }
        search(this.flowSnapshot.getFlowContents(), versionedComponentSearchResults);
        return versionedComponentSearchResults.toList();
    }

    private void search(VersionedProcessGroup versionedProcessGroup, VersionedComponentSearchResults versionedComponentSearchResults) {
        versionedComponentSearchResults.add(versionedProcessGroup.getName(), "Process Group Name");
        versionedComponentSearchResults.add(versionedProcessGroup.getComments(), "Process Group Comments");
        for (VersionedPort versionedPort : versionedProcessGroup.getInputPorts()) {
            versionedComponentSearchResults.add(versionedPort.getName(), "Input Port Name");
            versionedComponentSearchResults.add(versionedPort.getComments(), "Input Port Comments");
            versionedComponentSearchResults.add(versionedPort.getIdentifier(), "Input Port ID");
        }
        for (VersionedPort versionedPort2 : versionedProcessGroup.getOutputPorts()) {
            versionedComponentSearchResults.add(versionedPort2.getName(), "Output Port Name");
            versionedComponentSearchResults.add(versionedPort2.getComments(), "Output Port Comments");
            versionedComponentSearchResults.add(versionedPort2.getIdentifier(), "Output Port ID");
        }
        Iterator it = versionedProcessGroup.getLabels().iterator();
        while (it.hasNext()) {
            versionedComponentSearchResults.add(((VersionedLabel) it.next()).getLabel(), "Label Text");
        }
        for (VersionedProcessor versionedProcessor : versionedProcessGroup.getProcessors()) {
            versionedComponentSearchResults.add(versionedProcessor.getName(), "Processor Name");
            versionedComponentSearchResults.add(versionedProcessor.getType(), "Processor Type");
            versionedComponentSearchResults.add(versionedProcessor.getIdentifier(), "Processor ID");
            for (Map.Entry entry : versionedProcessor.getProperties().entrySet()) {
                versionedComponentSearchResults.add((String) entry.getKey(), "Processor Property Name");
                versionedComponentSearchResults.add((String) entry.getValue(), "Value of Processor Property " + ((String) entry.getKey()));
            }
            versionedComponentSearchResults.add(versionedProcessor.getComments(), "Processor Comments");
            Bundle bundle = versionedProcessor.getBundle();
            if (bundle != null) {
                versionedComponentSearchResults.add(bundle.getGroup(), "Bundle Group ID for Processor " + versionedProcessor.getType());
                versionedComponentSearchResults.add(bundle.getArtifact(), "Bundle Artifact ID for Processor " + versionedProcessor.getType());
                versionedComponentSearchResults.add(bundle.getVersion(), "Bundle Version for Processor " + versionedProcessor.getType());
            }
        }
        for (VersionedRemoteProcessGroup versionedRemoteProcessGroup : versionedProcessGroup.getRemoteProcessGroups()) {
            versionedComponentSearchResults.add(versionedRemoteProcessGroup.getTargetUris(), "RPG Target URI");
            versionedComponentSearchResults.add(versionedRemoteProcessGroup.getComments(), "RPG Comments");
            versionedComponentSearchResults.add(versionedRemoteProcessGroup.getIdentifier(), "RPG Identifier");
            for (VersionedRemoteGroupPort versionedRemoteGroupPort : versionedRemoteProcessGroup.getInputPorts()) {
                versionedComponentSearchResults.add(versionedRemoteGroupPort.getName(), "RPG Input Port Name");
                versionedComponentSearchResults.add(versionedRemoteGroupPort.getIdentifier(), "RPG Input Port ID");
                versionedComponentSearchResults.add(versionedRemoteGroupPort.getTargetId(), "RPG Input Port Target ID");
            }
            for (VersionedRemoteGroupPort versionedRemoteGroupPort2 : versionedRemoteProcessGroup.getOutputPorts()) {
                versionedComponentSearchResults.add(versionedRemoteGroupPort2.getName(), "RPG Output Port Name");
                versionedComponentSearchResults.add(versionedRemoteGroupPort2.getIdentifier(), "RPG Output Port ID");
                versionedComponentSearchResults.add(versionedRemoteGroupPort2.getTargetId(), "RPG Output Port Target ID");
            }
        }
        versionedComponentSearchResults.add(versionedProcessGroup.getParameterContextName(), "Parameter Context Name");
        for (VersionedConnection versionedConnection : versionedProcessGroup.getConnections()) {
            versionedComponentSearchResults.add(versionedConnection.getIdentifier(), "Connection ID");
            versionedComponentSearchResults.add(versionedConnection.getName(), "Connection Name");
            if (versionedConnection.getSelectedRelationships() != null) {
                versionedComponentSearchResults.add(versionedConnection.getSelectedRelationships().toString(), "Selected Relationships");
            }
            versionedComponentSearchResults.add(versionedConnection.getComments(), "Connection Comments");
        }
        for (VersionedControllerService versionedControllerService : versionedProcessGroup.getControllerServices()) {
            versionedComponentSearchResults.add(versionedControllerService.getName(), "Controller Service Name");
            versionedComponentSearchResults.add(versionedControllerService.getType(), "Controller Service Type");
            versionedComponentSearchResults.add(versionedControllerService.getIdentifier(), "Controller Service ID");
            for (Map.Entry entry2 : versionedControllerService.getProperties().entrySet()) {
                versionedComponentSearchResults.add((String) entry2.getKey(), "Controller Service Property Name");
                versionedComponentSearchResults.add((String) entry2.getValue(), "Value of Controller Service Property " + ((String) entry2.getKey()));
            }
            versionedComponentSearchResults.add(versionedControllerService.getComments(), "Controller Service Comments");
            Bundle bundle2 = versionedControllerService.getBundle();
            if (bundle2 != null) {
                versionedComponentSearchResults.add(bundle2.getGroup(), "Bundle Group ID for Controller Service " + versionedControllerService.getType());
                versionedComponentSearchResults.add(bundle2.getArtifact(), "Bundle Artifact ID for Controller Service " + versionedControllerService.getType());
                versionedComponentSearchResults.add(bundle2.getVersion(), "Bundle Version for Controller Service " + versionedControllerService.getType());
            }
        }
        Iterator it2 = versionedProcessGroup.getProcessGroups().iterator();
        while (it2.hasNext()) {
            search((VersionedProcessGroup) it2.next(), versionedComponentSearchResults);
        }
    }

    private Collection<File> getAdditionalNarDirectories(String str) {
        if (str == null || str.isEmpty()) {
            return Collections.emptyList();
        }
        String[] split = str.split(",");
        ArrayList arrayList = new ArrayList();
        for (String str2 : split) {
            String trim = str2.trim();
            if (!trim.isEmpty()) {
                arrayList.add(new File(trim));
            }
        }
        return arrayList;
    }
}
