package org.apache.nifi.groups;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/groups/StandardDataValve.class */
public class StandardDataValve implements DataValve {
    private static final Logger logger = LoggerFactory.getLogger(StandardDataValve.class);
    private static final String GROUPS_WITH_DATA_FLOWING_IN_STATE_KEY = "groupsWithDataFlowingIn";
    private static final String GROUPS_WITH_DATA_FLOWING_OUT_STATE_KEY = "groupsWithDataFlowingOut";
    private final ProcessGroup processGroup;
    private final StateManager stateManager;
    private final Set<String> groupsWithDataFlowingIn = new HashSet();
    private final Set<String> groupsWithDataFlowingOut = new HashSet();
    private boolean leftOpenDueToDataQueued = false;

    /* loaded from: input_file:org/apache/nifi/groups/StandardDataValve$FlowInForbiddenReason.class */
    public enum FlowInForbiddenReason {
        DATA_QUEUED("Process Group already has data queued and valve is not already open to allow data to flow in"),
        OPEN_FOR_OUTPUT("Data Valve is already open to allow data to flow out of group"),
        SOURCE_FLOWING_OUT("Port has an incoming connection from a Process Group that is currently allowing data to flow out");

        private final String explanation;

        FlowInForbiddenReason(String str) {
            this.explanation = str;
        }

        public String getExplanation() {
            return this.explanation;
        }
    }

    public StandardDataValve(ProcessGroup processGroup, StateManager stateManager) {
        this.processGroup = processGroup;
        this.stateManager = stateManager;
        recoverState();
    }

    public synchronized boolean tryOpenFlowIntoGroup(ProcessGroup processGroup) {
        if (this.groupsWithDataFlowingIn.contains(processGroup.getIdentifier())) {
            logger.debug("Allowing data to flow into {} because valve is already open", processGroup);
            return true;
        }
        FlowInForbiddenReason reasonFlowIntoGroupNotAllowed = getReasonFlowIntoGroupNotAllowed(processGroup);
        if (reasonFlowIntoGroupNotAllowed == FlowInForbiddenReason.OPEN_FOR_OUTPUT && this.leftOpenDueToDataQueued && !processGroup.isDataQueued()) {
            this.groupsWithDataFlowingOut.remove(processGroup.getIdentifier());
        }
        if (reasonFlowIntoGroupNotAllowed != null) {
            return false;
        }
        logger.debug("Opening valve to allow data to flow into {}", processGroup);
        this.groupsWithDataFlowingIn.add(processGroup.getIdentifier());
        storeState();
        return true;
    }

    private FlowInForbiddenReason getReasonFlowIntoGroupNotAllowed(ProcessGroup processGroup) {
        if (processGroup.isDataQueued()) {
            logger.trace("Will not allow data to flow into {} because valve is not already open and the Process Group has data queued", processGroup);
            return FlowInForbiddenReason.DATA_QUEUED;
        }
        if (processGroup.getFlowFileOutboundPolicy() == FlowFileOutboundPolicy.BATCH_OUTPUT && this.groupsWithDataFlowingOut.contains(processGroup.getIdentifier())) {
            logger.trace("Will not allow data to flow into {} because Outbound Policy is Batch Output and valve is already open to allow data to flow out of group", processGroup);
            return FlowInForbiddenReason.OPEN_FOR_OUTPUT;
        }
        for (Port port : processGroup.getInputPorts()) {
            Iterator it = port.getIncomingConnections().iterator();
            while (it.hasNext()) {
                Connectable source = ((Connection) it.next()).getSource();
                if (source.getConnectableType() == ConnectableType.OUTPUT_PORT) {
                    ProcessGroup processGroup2 = source.getProcessGroup();
                    if (processGroup2.getFlowFileOutboundPolicy() == FlowFileOutboundPolicy.BATCH_OUTPUT && Boolean.TRUE.equals(Boolean.valueOf(this.groupsWithDataFlowingOut.contains(processGroup2.getIdentifier())))) {
                        logger.trace("Will not allow data to flow into {} because port {} has an incoming connection from {} and that Process Group is currently allowing data to flow out", new Object[]{processGroup, port, source});
                        return FlowInForbiddenReason.SOURCE_FLOWING_OUT;
                    }
                }
            }
        }
        return null;
    }

    public synchronized void closeFlowIntoGroup(ProcessGroup processGroup) {
        if (this.groupsWithDataFlowingIn.contains(processGroup.getIdentifier())) {
            if (processGroup.getFlowFileConcurrency() == FlowFileConcurrency.SINGLE_BATCH_PER_NODE) {
                Iterator it = processGroup.getInputPorts().iterator();
                while (it.hasNext()) {
                    for (Connection connection : ((Port) it.next()).getIncomingConnections()) {
                        if (!connection.getFlowFileQueue().isEmpty()) {
                            logger.debug("Triggered to close flow of data into group {} but Input Port has incoming Connection {}, which is not empty, so will not close valve", processGroup, connection);
                            return;
                        }
                    }
                }
            }
            logger.debug("Closed valve so that data can no longer flow into {}", processGroup);
            storeState();
            this.groupsWithDataFlowingIn.remove(processGroup.getIdentifier());
        }
    }

    public synchronized boolean tryOpenFlowOutOfGroup(ProcessGroup processGroup) {
        if (this.groupsWithDataFlowingOut.contains(processGroup.getIdentifier())) {
            logger.debug("Allowing data to flow out of {} because valve is already open", processGroup);
            return true;
        }
        if (getReasonFlowOutOfGroupNotAllowed(processGroup) != null) {
            return false;
        }
        logger.debug("Opening valve to allow data to flow out of {}", processGroup);
        this.groupsWithDataFlowingOut.add(processGroup.getIdentifier());
        storeState();
        this.leftOpenDueToDataQueued = false;
        return true;
    }

    private String getReasonFlowOutOfGroupNotAllowed(ProcessGroup processGroup) {
        for (Port port : processGroup.getOutputPorts()) {
            for (Connection connection : port.getConnections()) {
                Connectable destination = connection.getDestination();
                if (destination.getConnectableType() == ConnectableType.INPUT_PORT) {
                    ProcessGroup processGroup2 = destination.getProcessGroup();
                    if (processGroup2.getFlowFileConcurrency() != FlowFileConcurrency.SINGLE_BATCH_PER_NODE) {
                        continue;
                    } else {
                        if (!connection.getFlowFileQueue().isEmpty()) {
                            logger.trace("Not allowing data to flow out of {} because {} has a destination of {}, which has data queued and its Process Group is configured with a FlowFileConcurrency of Batch Per Node.", new Object[]{processGroup, port, connection});
                            return "Output Connection already has data queued";
                        }
                        if (this.groupsWithDataFlowingIn.contains(processGroup2.getIdentifier())) {
                            logger.trace("Not allowing data to flow out of {} because {} has a destination of {}, and its Process Group is currently allowing data to flow in", new Object[]{processGroup, port, connection});
                            return "Destination Process Group is allowing data to flow in";
                        }
                    }
                }
            }
        }
        return null;
    }

    public synchronized void closeFlowOutOfGroup(ProcessGroup processGroup) {
        if (this.groupsWithDataFlowingOut.contains(processGroup.getIdentifier())) {
            if (processGroup.isDataQueued()) {
                logger.debug("Triggered to close flow of data out of group {} but group is not empty so will not close valve", processGroup);
                this.leftOpenDueToDataQueued = true;
            } else {
                logger.debug("Closed valve so that data can no longer flow out of {}", processGroup);
                this.groupsWithDataFlowingOut.remove(processGroup.getIdentifier());
                storeState();
            }
        }
    }

    public synchronized DataValveDiagnostics getDiagnostics() {
        Stream<String> stream = this.groupsWithDataFlowingIn.stream();
        ProcessGroup processGroup = this.processGroup;
        processGroup.getClass();
        final Set set = (Set) stream.map(processGroup::getProcessGroup).collect(Collectors.toSet());
        Stream<String> stream2 = this.groupsWithDataFlowingOut.stream();
        ProcessGroup processGroup2 = this.processGroup;
        processGroup2.getClass();
        final Set set2 = (Set) stream2.map(processGroup2::getProcessGroup).collect(Collectors.toSet());
        final HashMap hashMap = new HashMap();
        final HashMap hashMap2 = new HashMap();
        for (ProcessGroup processGroup3 : this.processGroup.getProcessGroups()) {
            if (processGroup3.getFlowFileConcurrency() == FlowFileConcurrency.SINGLE_BATCH_PER_NODE) {
                FlowInForbiddenReason reasonFlowIntoGroupNotAllowed = getReasonFlowIntoGroupNotAllowed(processGroup3);
                ((List) hashMap.computeIfAbsent(reasonFlowIntoGroupNotAllowed == null ? "Input is Allowed" : reasonFlowIntoGroupNotAllowed.getExplanation(), str -> {
                    return new ArrayList();
                })).add(processGroup3);
            } else {
                ((List) hashMap.computeIfAbsent("FlowFile Concurrency is " + processGroup3.getFlowFileConcurrency(), str2 -> {
                    return new ArrayList();
                })).add(processGroup3);
            }
            if (processGroup3.getFlowFileOutboundPolicy() == FlowFileOutboundPolicy.BATCH_OUTPUT) {
                String reasonFlowOutOfGroupNotAllowed = getReasonFlowOutOfGroupNotAllowed(processGroup3);
                if (reasonFlowOutOfGroupNotAllowed == null) {
                    reasonFlowOutOfGroupNotAllowed = "Output is Allowed";
                }
                ((List) hashMap2.computeIfAbsent(reasonFlowOutOfGroupNotAllowed, str3 -> {
                    return new ArrayList();
                })).add(processGroup3);
            } else {
                ((List) hashMap2.computeIfAbsent("FlowFile Outbound Policy is " + processGroup3.getFlowFileOutboundPolicy(), str4 -> {
                    return new ArrayList();
                })).add(processGroup3);
            }
        }
        return new DataValveDiagnostics() { // from class: org.apache.nifi.groups.StandardDataValve.1
            public Set<ProcessGroup> getGroupsWithDataFlowingIn() {
                return set;
            }

            public Set<ProcessGroup> getGroupsWithDataFlowingOut() {
                return set2;
            }

            public Map<String, List<ProcessGroup>> getReasonForInputNotAllowed() {
                return hashMap;
            }

            public Map<String, List<ProcessGroup>> getReasonForOutputNotAllowed() {
                return hashMap2;
            }
        };
    }

    private synchronized void recoverState() {
        try {
            StateMap state = this.stateManager.getState(Scope.LOCAL);
            if (state.getVersion() < 0) {
                logger.debug("No state to recover for {}", this);
                return;
            }
            List<String> idsForKey = getIdsForKey(state, GROUPS_WITH_DATA_FLOWING_IN_STATE_KEY);
            List<String> idsForKey2 = getIdsForKey(state, GROUPS_WITH_DATA_FLOWING_OUT_STATE_KEY);
            logger.debug("Recovered state for {}; {} Process Groups have data flowing in ({}); {} Process Groups have data flowing out ({})", new Object[]{this, Integer.valueOf(idsForKey.size()), idsForKey, Integer.valueOf(idsForKey2.size()), idsForKey2});
            this.groupsWithDataFlowingIn.addAll(idsForKey);
            this.groupsWithDataFlowingOut.addAll(idsForKey2);
        } catch (Exception e) {
            logger.error("Failed to recover state for {}. This could result in Process Groups configured with a FlowFile Concurrency of SINGLE_BATCH_PER_NODE to get data from multiple batches concurrently or stop ingesting data", this, e);
        }
    }

    private List<String> getIdsForKey(StateMap stateMap, String str) {
        String str2 = stateMap.get(str);
        return (str2 == null || str2.isEmpty()) ? Collections.emptyList() : Arrays.asList(str2.split(","));
    }

    private void storeState() {
        String join = StringUtils.join(this.groupsWithDataFlowingIn, ",");
        String join2 = StringUtils.join(this.groupsWithDataFlowingOut, ",");
        HashMap hashMap = new HashMap();
        hashMap.put(GROUPS_WITH_DATA_FLOWING_IN_STATE_KEY, join);
        hashMap.put(GROUPS_WITH_DATA_FLOWING_OUT_STATE_KEY, join2);
        try {
            this.stateManager.setState(hashMap, Scope.LOCAL);
        } catch (Exception e) {
            logger.error("Failed to store state for {}. If NiFi is restarted before state is properly stored, this could result Process Groups configured with a FlowFile Concurrency of SINGLE_BATCH_PER_NODE to get data from multiple batches concurrently or stop ingesting data", this, e);
        }
    }

    public String toString() {
        return "StandardDataValve[group=" + this.processGroup + "]";
    }
}
