package org.apache.nifi.stateless.flow;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StatelessStateManagerProvider;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.LocalPort;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.controller.state.StandardStateMap;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.TerminatedTaskException;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.stateless.engine.ExecutionProgress;
import org.apache.nifi.stateless.engine.ProcessContextFactory;
import org.apache.nifi.stateless.engine.StandardExecutionProgress;
import org.apache.nifi.stateless.flow.StandardStatelessFlowCurrent;
import org.apache.nifi.stateless.queue.DrainableFlowFileQueue;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
import org.apache.nifi.stateless.session.AsynchronousCommitTracker;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.Connectables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/stateless/flow/StandardStatelessFlow.class */
public class StandardStatelessFlow implements StatelessDataflow {
    private static final Logger logger = LoggerFactory.getLogger(StandardStatelessFlow.class);
    private static final long COMPONENT_ENABLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);
    private static final long TEN_MILLIS_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
    private static final String PARENT_FLOW_GROUP_ID = "stateless-flow";
    private final ProcessGroup rootGroup;
    private final List<Connection> allConnections;
    private final List<ReportingTaskNode> reportingTasks;
    private final ControllerServiceProvider controllerServiceProvider;
    private final ProcessContextFactory processContextFactory;
    private final RepositoryContextFactory repositoryContextFactory;
    private final List<FlowFileQueue> internalFlowFileQueues;
    private final DataflowDefinition dataflowDefinition;
    private final StatelessStateManagerProvider stateManagerProvider;
    private final ProcessScheduler processScheduler;
    private final TransactionThresholdMeter transactionThresholdMeter;
    private final BulletinRepository bulletinRepository;
    private volatile ExecutorService runDataflowExecutor;
    private volatile ScheduledExecutorService backgroundTaskExecutor;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final AsynchronousCommitTracker tracker = new AsynchronousCommitTracker();
    private final List<BackgroundTask> backgroundTasks = new ArrayList();
    private volatile boolean initialized = false;
    private volatile Boolean stateful = null;
    private final Set<Connectable> rootConnectables = new HashSet();

    /* loaded from: input_file:org/apache/nifi/stateless/flow/StandardStatelessFlow$BackgroundTask.class */
    private static class BackgroundTask {
        private final Runnable task;
        private final long schedulingPeriod;
        private final TimeUnit schedulingUnit;

        public BackgroundTask(Runnable runnable, long j, TimeUnit timeUnit) {
            this.task = runnable;
            this.schedulingPeriod = j;
            this.schedulingUnit = timeUnit;
        }

        public Runnable getTask() {
            return this.task;
        }

        public long getSchedulingPeriod() {
            return this.schedulingPeriod;
        }

        public TimeUnit getSchedulingUnit() {
            return this.schedulingUnit;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/stateless/flow/StandardStatelessFlow$SerializableStateMap.class */
    public static class SerializableStateMap {
        private long version;
        private Map<String, String> stateValues;

        private SerializableStateMap() {
        }

        public long getVersion() {
            return this.version;
        }

        public void setVersion(long j) {
            this.version = j;
        }

        public Map<String, String> getStateValues() {
            return this.stateValues;
        }

        public void setStateValues(Map<String, String> map) {
            this.stateValues = map;
        }
    }

    public StandardStatelessFlow(ProcessGroup processGroup, List<ReportingTaskNode> list, ControllerServiceProvider controllerServiceProvider, ProcessContextFactory processContextFactory, RepositoryContextFactory repositoryContextFactory, DataflowDefinition dataflowDefinition, StatelessStateManagerProvider statelessStateManagerProvider, ProcessScheduler processScheduler, BulletinRepository bulletinRepository) {
        this.rootGroup = processGroup;
        this.allConnections = processGroup.findAllConnections();
        this.reportingTasks = list;
        this.controllerServiceProvider = controllerServiceProvider;
        this.processContextFactory = processContextFactory;
        this.repositoryContextFactory = repositoryContextFactory;
        this.dataflowDefinition = dataflowDefinition;
        this.stateManagerProvider = statelessStateManagerProvider;
        this.processScheduler = processScheduler;
        this.transactionThresholdMeter = new TransactionThresholdMeter(dataflowDefinition.getTransactionThresholds());
        this.bulletinRepository = bulletinRepository;
        discoverRootProcessors(processGroup, this.rootConnectables);
        discoverRootRemoteGroupPorts(processGroup, this.rootConnectables);
        discoverRootInputPorts(processGroup, this.rootConnectables);
        this.internalFlowFileQueues = discoverInternalFlowFileQueues(processGroup);
    }

    private List<FlowFileQueue> discoverInternalFlowFileQueues(ProcessGroup processGroup) {
        Set inputPorts = this.rootGroup.getInputPorts();
        Set outputPorts = this.rootGroup.getOutputPorts();
        return (List) processGroup.findAllConnections().stream().filter(connection -> {
            return !inputPorts.contains(connection.getSource());
        }).filter(connection2 -> {
            return !outputPorts.contains(connection2.getDestination());
        }).map((v0) -> {
            return v0.getFlowFileQueue();
        }).distinct().collect(Collectors.toCollection(ArrayList::new));
    }

    private void discoverRootInputPorts(ProcessGroup processGroup, Set<Connectable> set) {
        Iterator it = processGroup.getInputPorts().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((Port) it.next()).getConnections().iterator();
            while (it2.hasNext()) {
                Connectable destination = ((Connection) it2.next()).getDestination();
                if (!isTerminalPort(destination)) {
                    set.add(destination);
                }
            }
        }
    }

    private void discoverRootProcessors(ProcessGroup processGroup, Set<Connectable> set) {
        for (ProcessorNode processorNode : processGroup.findAllProcessors()) {
            if (!Connectables.hasNonLoopConnection(processorNode)) {
                set.add(processorNode);
            }
        }
    }

    private void discoverRootRemoteGroupPorts(ProcessGroup processGroup, Set<Connectable> set) {
        Iterator it = processGroup.findAllRemoteProcessGroups().iterator();
        while (it.hasNext()) {
            for (RemoteGroupPort remoteGroupPort : ((RemoteProcessGroup) it.next()).getOutputPorts()) {
                if (!remoteGroupPort.getConnections().isEmpty()) {
                    set.add(remoteGroupPort);
                }
            }
        }
    }

    public static boolean isTerminalPort(Connectable connectable) {
        if (connectable.getConnectableType() != ConnectableType.OUTPUT_PORT || !PARENT_FLOW_GROUP_ID.equals(connectable.getProcessGroup().getIdentifier())) {
            return false;
        }
        logger.debug("FlowFiles queued for {} but this is a Terminal Port. Will not trigger Port to run.", connectable);
        return true;
    }

    public void initialize() {
        if (this.initialized) {
            logger.debug("{} initialize() was called, but dataflow has already been initialized. Returning without doing anything.", this);
            return;
        }
        this.initialized = true;
        performValidation();
        try {
            long currentTimeMillis = System.currentTimeMillis();
            enableControllerServices(this.rootGroup);
            waitForServicesEnabled(this.rootGroup);
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            long currentTimeMillis3 = System.currentTimeMillis();
            StatelessDataflowValidation performValidation = performValidation();
            long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis3;
            if (!performValidation.isValid()) {
                logger.warn("{} Attempting to initialize dataflow but found at least one invalid component: {}", this, performValidation);
            }
            startProcessors(this.rootGroup);
            startRemoteGroups(this.rootGroup);
            startReportingTasks();
            logger.info("Successfully initialized components in {} millis ({} millis to perform validation, {} millis for services to enable)", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis3), Long.valueOf(currentTimeMillis4), Long.valueOf(currentTimeMillis2)});
            String flowName = this.dataflowDefinition.getFlowName();
            this.runDataflowExecutor = Executors.newFixedThreadPool(1, createNamedThreadFactory((flowName == null || flowName.trim().isEmpty()) ? "Run Dataflow" : "Run Dataflow " + flowName, false));
            this.backgroundTaskExecutor = Executors.newScheduledThreadPool(1, createNamedThreadFactory("Background Tasks", true));
            this.backgroundTasks.forEach(backgroundTask -> {
                this.backgroundTaskExecutor.scheduleWithFixedDelay(backgroundTask.getTask(), backgroundTask.getSchedulingPeriod(), backgroundTask.getSchedulingPeriod(), backgroundTask.getSchedulingUnit());
            });
        } catch (Throwable th) {
            this.processScheduler.shutdown();
            throw th;
        }
    }

    private ThreadFactory createNamedThreadFactory(String str, boolean z) {
        return runnable -> {
            Thread newThread = Executors.defaultThreadFactory().newThread(runnable);
            newThread.setName(str);
            newThread.setDaemon(z);
            return newThread;
        };
    }

    public void scheduleBackgroundTask(Runnable runnable, long j, TimeUnit timeUnit) {
        this.backgroundTasks.add(new BackgroundTask(runnable, j, timeUnit));
    }

    private void waitForServicesEnabled(ProcessGroup processGroup) {
        long currentTimeMillis = System.currentTimeMillis() + COMPONENT_ENABLE_TIMEOUT_MILLIS;
        Iterator it = processGroup.findAllControllerServices().iterator();
        while (it.hasNext()) {
            try {
                if (!((ControllerServiceNode) it.next()).awaitEnabled(currentTimeMillis - System.currentTimeMillis(), TimeUnit.MILLISECONDS) && System.currentTimeMillis() > currentTimeMillis) {
                    throw new IllegalStateException("At least one Controller Service never finished enabling. All validation errors: " + performValidation().toString());
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted while waiting for Controller Services to enable", e);
            }
        }
    }

    private void startReportingTasks() {
        this.reportingTasks.forEach(this::startReportingTask);
    }

    private void startReportingTask(ReportingTaskNode reportingTaskNode) {
        this.processScheduler.schedule(reportingTaskNode);
    }

    public void shutdown() {
        if (this.runDataflowExecutor != null) {
            this.runDataflowExecutor.shutdown();
        }
        if (this.backgroundTaskExecutor != null) {
            this.backgroundTaskExecutor.shutdown();
        }
        this.rootGroup.stopProcessing();
        this.rootGroup.findAllRemoteProcessGroups().forEach((v0) -> {
            v0.shutdown();
        });
        this.rootGroup.shutdown();
        Set findAllControllerServices = this.rootGroup.findAllControllerServices();
        this.controllerServiceProvider.disableControllerServicesAsync(findAllControllerServices);
        List<ReportingTaskNode> list = this.reportingTasks;
        ProcessScheduler processScheduler = this.processScheduler;
        processScheduler.getClass();
        list.forEach(processScheduler::unschedule);
        this.stateManagerProvider.shutdown();
        findAllControllerServices.forEach(controllerServiceNode -> {
            this.processScheduler.shutdownControllerService(controllerServiceNode, this.controllerServiceProvider);
        });
        List<ReportingTaskNode> list2 = this.reportingTasks;
        ProcessScheduler processScheduler2 = this.processScheduler;
        processScheduler2.getClass();
        list2.forEach(processScheduler2::shutdownReportingTask);
        this.processScheduler.shutdown();
        this.repositoryContextFactory.shutdown();
    }

    public StatelessDataflowValidation performValidation() {
        HashMap hashMap = new HashMap();
        Iterator it = this.rootGroup.findAllControllerServices().iterator();
        while (it.hasNext()) {
            performValidation((ControllerServiceNode) it.next(), hashMap);
        }
        Iterator it2 = this.rootGroup.findAllProcessors().iterator();
        while (it2.hasNext()) {
            performValidation((ProcessorNode) it2.next(), hashMap);
        }
        return new StandardStatelessDataflowValidation(hashMap);
    }

    private void performValidation(ComponentNode componentNode, Map<ComponentNode, List<ValidationResult>> map) {
        if (componentNode.performValidation() == ValidationStatus.VALID) {
            return;
        }
        Collection<ValidationResult> validationErrors = componentNode.getValidationErrors();
        ArrayList arrayList = new ArrayList();
        for (ValidationResult validationResult : validationErrors) {
            if (!validationResult.isValid()) {
                arrayList.add(validationResult);
            }
        }
        map.put(componentNode, arrayList);
    }

    private void enableControllerServices(ProcessGroup processGroup) {
        for (ControllerServiceNode controllerServiceNode : processGroup.getControllerServices(false)) {
            try {
                this.controllerServiceProvider.enableControllerServiceAndDependencies(controllerServiceNode).get(COMPONENT_ENABLE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                throw new IllegalStateException("Controller Service " + controllerServiceNode + " has not fully enabled. Current Validation Status is " + controllerServiceNode.getValidationStatus() + " with validation Errors: " + controllerServiceNode.getValidationErrors(), e);
            }
        }
        processGroup.getProcessGroups().forEach(this::enableControllerServices);
    }

    private void startProcessors(ProcessGroup processGroup) {
        Collection<ProcessorNode> processors = processGroup.getProcessors();
        HashMap hashMap = new HashMap(processors.size());
        for (ProcessorNode processorNode : processors) {
            hashMap.put(processorNode, processGroup.startProcessor(processorNode, true));
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            ProcessorNode processorNode2 = (ProcessorNode) entry.getKey();
            Future future = (Future) entry.getValue();
            long currentTimeMillis = System.currentTimeMillis();
            try {
                future.get(COMPONENT_ENABLE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
                logger.debug("Waited {} millis for {} to start", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), processorNode2);
            } catch (Exception e) {
                throw new IllegalStateException("Processor " + processorNode2 + " has not fully enabled. Current Validation Status is " + processorNode2.getValidationStatus() + ". All validation errors: " + performValidation().toString());
            }
        }
        processGroup.getProcessGroups().forEach(this::startProcessors);
    }

    private void startRemoteGroups(ProcessGroup processGroup) {
        List findAllRemoteProcessGroups = processGroup.findAllRemoteProcessGroups();
        findAllRemoteProcessGroups.forEach((v0) -> {
            v0.initialize();
        });
        findAllRemoteProcessGroups.forEach((v0) -> {
            v0.startTransmitting();
        });
    }

    public DataflowTrigger trigger(DataflowTriggerContext dataflowTriggerContext) {
        if (!this.initialized) {
            throw new IllegalStateException("Must initialize dataflow before triggering it");
        }
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        final StandardExecutionProgress standardExecutionProgress = new StandardExecutionProgress(this.rootGroup, this.internalFlowFileQueues, linkedBlockingQueue, this.repositoryContextFactory, this.dataflowDefinition.getFailurePortNames(), this.tracker, this.stateManagerProvider, dataflowTriggerContext, this::purge);
        final AtomicReference atomicReference = new AtomicReference();
        DataflowTrigger dataflowTrigger = new DataflowTrigger() { // from class: org.apache.nifi.stateless.flow.StandardStatelessFlow.1
            public void cancel() {
                standardExecutionProgress.notifyExecutionCanceled();
                Future future = (Future) atomicReference.get();
                if (future != null) {
                    future.cancel(true);
                }
            }

            public Optional<TriggerResult> getResultNow() {
                return Optional.ofNullable((TriggerResult) linkedBlockingQueue.poll());
            }

            public Optional<TriggerResult> getResult(long j, TimeUnit timeUnit) throws InterruptedException {
                return Optional.ofNullable((TriggerResult) linkedBlockingQueue.poll(j, timeUnit));
            }

            public TriggerResult getResult() throws InterruptedException {
                return (TriggerResult) linkedBlockingQueue.take();
            }
        };
        atomicReference.set(this.runDataflowExecutor.submit(() -> {
            executeDataflow(linkedBlockingQueue, standardExecutionProgress, this.tracker);
        }));
        return dataflowTrigger;
    }

    private void executeDataflow(BlockingQueue<TriggerResult> blockingQueue, ExecutionProgress executionProgress, AsynchronousCommitTracker asynchronousCommitTracker) {
        long nanoTime = System.nanoTime();
        this.transactionThresholdMeter.reset();
        try {
            new StandardStatelessFlowCurrent.Builder().commitTracker(asynchronousCommitTracker).executionProgress(executionProgress).processContextFactory(this.processContextFactory).repositoryContextFactory(this.repositoryContextFactory).rootConnectables(this.rootConnectables).transactionThresholdMeter(this.transactionThresholdMeter).build().triggerFlow();
            logger.debug("Completed triggering of components in dataflow. Will now wait for acknowledgment");
            switch (executionProgress.awaitCompletionAction()) {
                case CANCEL:
                    logger.debug("Dataflow was canceled");
                    purge();
                    break;
                case COMPLETE:
                default:
                    if (logger.isDebugEnabled()) {
                        long nanoTime2 = System.nanoTime() - nanoTime;
                        logger.debug("Ran dataflow in {}", nanoTime2 > TEN_MILLIS_IN_NANOS ? TimeUnit.NANOSECONDS.toMillis(nanoTime2) + " millis" : NumberFormat.getInstance().format(nanoTime2) + " nanos");
                        break;
                    }
                    break;
            }
        } catch (TerminatedTaskException e) {
            logger.debug("Caught a TerminatedTaskException", e);
            purge();
            asynchronousCommitTracker.triggerFailureCallbacks(e);
            this.stateManagerProvider.rollbackUpdates();
            blockingQueue.offer(new CanceledTriggerResult());
        } catch (Throwable th) {
            logger.error("Failed to execute dataflow", th);
            purge();
            asynchronousCommitTracker.triggerFailureCallbacks(th);
            this.stateManagerProvider.rollbackUpdates();
            blockingQueue.offer(new ExceptionalTriggerResult(th));
        }
    }

    public boolean isStateful() {
        if (this.stateful == null) {
            if (this.reportingTasks.stream().anyMatch(this::isStateful)) {
                return true;
            }
            this.stateful = Boolean.valueOf(isStateful(this.rootGroup));
        }
        return this.stateful.booleanValue();
    }

    private boolean isStateful(ProcessGroup processGroup) {
        if (processGroup.getProcessors().stream().anyMatch(this::isStateful) || processGroup.getControllerServices(false).stream().anyMatch(this::isStateful)) {
            return true;
        }
        return processGroup.getProcessGroups().stream().anyMatch(this::isStateful);
    }

    private boolean isStateful(ProcessorNode processorNode) {
        return processorNode.getProcessor().isStateful(this.processContextFactory.createProcessContext(processorNode));
    }

    private boolean isStateful(ControllerServiceNode controllerServiceNode) {
        return controllerServiceNode.getControllerServiceImplementation().isStateful(new StandardConfigurationContext(controllerServiceNode, this.controllerServiceProvider, (String) null, this.rootGroup.getVariableRegistry()));
    }

    private boolean isStateful(ReportingTaskNode reportingTaskNode) {
        return reportingTaskNode.getReportingTask().isStateful(reportingTaskNode.getReportingContext());
    }

    public Set<String> getInputPortNames() {
        return (Set) this.rootGroup.getInputPorts().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toSet());
    }

    public Set<String> getOutputPortNames() {
        return (Set) this.rootGroup.getOutputPorts().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toSet());
    }

    public QueueSize enqueue(byte[] bArr, Map<String, String> map, String str) {
        try {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
            Throwable th = null;
            try {
                try {
                    QueueSize enqueue = enqueue(byteArrayInputStream, map, str);
                    if (byteArrayInputStream != null) {
                        if (0 != 0) {
                            try {
                                byteArrayInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            byteArrayInputStream.close();
                        }
                    }
                    return enqueue;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new FlowFileAccessException("Failed to enqueue FlowFile", e);
        }
    }

    public QueueSize enqueue(InputStream inputStream, Map<String, String> map, String str) {
        Connectable inputPortByName = this.rootGroup.getInputPortByName(str);
        if (inputPortByName == null) {
            throw new IllegalArgumentException("No Input Port exists with name <" + str + ">. Valid Port names are " + getInputPortNames());
        }
        ProcessSession createSession = new StandardProcessSessionFactory(this.repositoryContextFactory.createRepositoryContext(inputPortByName), () -> {
            return false;
        }).createSession();
        try {
            Set connections = inputPortByName.getConnections();
            if (connections.isEmpty()) {
                throw new IllegalStateException("Cannot enqueue data for Input Port <" + str + "> because it has no outgoing connections");
            }
            createSession.transfer(createSession.putAllAttributes(createSession.write(createSession.create(), outputStream -> {
                StreamUtils.copy(inputStream, outputStream);
            }), map), LocalPort.PORT_RELATIONSHIP);
            createSession.commitAsync();
            return ((Connection) connections.iterator().next()).getFlowFileQueue().size();
        } catch (Throwable th) {
            createSession.rollback();
            throw th;
        }
    }

    public boolean isFlowFileQueued() {
        Iterator<Connection> it = this.allConnections.iterator();
        while (it.hasNext()) {
            if (!it.next().getFlowFileQueue().isActiveQueueEmpty()) {
                return true;
            }
        }
        return false;
    }

    public void purge() {
        ArrayList arrayList = new ArrayList();
        Iterator<Connection> it = this.allConnections.iterator();
        while (it.hasNext()) {
            ((DrainableFlowFileQueue) it.next().getFlowFileQueue()).drainTo(arrayList);
            arrayList.clear();
        }
        this.repositoryContextFactory.getContentRepository().purge();
    }

    public Map<String, String> getComponentStates(Scope scope) {
        return serializeStateMaps(this.stateManagerProvider.getAllComponentStates(scope));
    }

    private Map<String, String> serializeStateMaps(Map<String, StateMap> map) {
        if (map == null) {
            return Collections.emptyMap();
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, StateMap> entry : map.entrySet()) {
            String key = entry.getKey();
            StateMap value = entry.getValue();
            if (value.getVersion() != -1) {
                SerializableStateMap serializableStateMap = new SerializableStateMap();
                serializableStateMap.setStateValues(value.toMap());
                serializableStateMap.setVersion(value.getVersion());
                try {
                    hashMap.put(key, this.objectMapper.writeValueAsString(serializableStateMap));
                } catch (Exception e) {
                    throw new RuntimeException("Failed to serialize components' state maps as Strings", e);
                }
            }
        }
        return hashMap;
    }

    public void setComponentStates(Map<String, String> map, Scope scope) {
        this.stateManagerProvider.updateComponentsStates(deserializeStateMaps(map), scope);
    }

    private Map<String, StateMap> deserializeStateMaps(Map<String, String> map) {
        if (map == null) {
            return Collections.emptyMap();
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String key = entry.getKey();
            try {
                SerializableStateMap serializableStateMap = (SerializableStateMap) this.objectMapper.readValue(entry.getValue(), SerializableStateMap.class);
                hashMap.put(key, new StandardStateMap(serializableStateMap.getStateValues(), serializableStateMap.getVersion()));
            } catch (Exception e) {
                logger.error("Failed to deserialized components' state for component with ID {}. State will be reset to empty", key, e);
            }
        }
        return hashMap;
    }

    public boolean isSourcePrimaryNodeOnly() {
        Iterator<Connectable> it = this.rootConnectables.iterator();
        while (it.hasNext()) {
            if (it.next().isIsolated()) {
                return true;
            }
        }
        return false;
    }

    public long getSourceYieldExpiration() {
        long j = 0;
        Iterator<Connectable> it = this.rootConnectables.iterator();
        while (it.hasNext()) {
            j = Math.max(j, it.next().getYieldExpiration());
        }
        return j;
    }

    public void resetCounters() {
        CounterRepository counterRepository = this.repositoryContextFactory.getCounterRepository();
        counterRepository.getCounters().forEach(counter -> {
            counterRepository.resetCounter(counter.getIdentifier());
        });
    }

    public Map<String, Long> getCounters(boolean z) {
        HashMap hashMap = new HashMap();
        for (Counter counter : this.repositoryContextFactory.getCounterRepository().getCounters()) {
            boolean z2 = !counter.getContext().endsWith(")");
            if (z || !z2) {
                hashMap.put(z2 ? counter.getName() : counter.getName() + " - " + counter.getContext(), Long.valueOf(counter.getValue()));
            }
        }
        return hashMap;
    }

    public BulletinRepository getBulletinRepository() {
        return this.bulletinRepository;
    }

    public Set<Processor> findAllProcessors() {
        return (Set) this.rootGroup.findAllProcessors().stream().map((v0) -> {
            return v0.getProcessor();
        }).collect(Collectors.toSet());
    }
}
