package org.apache.nifi.stateless.engine;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.nifi.components.state.StatelessStateManagerProvider;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.stateless.engine.ExecutionProgress;
import org.apache.nifi.stateless.flow.FailurePortEncounteredException;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.apache.nifi.stateless.queue.DrainableFlowFileQueue;
import org.apache.nifi.stateless.repository.ByteArrayContentRepository;
import org.apache.nifi.stateless.session.AsynchronousCommitTracker;

/* loaded from: input_file:org/apache/nifi/stateless/engine/StandardExecutionProgress.class */
public class StandardExecutionProgress implements ExecutionProgress {
    private final ProcessGroup rootGroup;
    private final List<FlowFileQueue> internalFlowFileQueues;
    private final ByteArrayContentRepository contentRepository;
    private final BlockingQueue<TriggerResult> resultQueue;
    private final Set<String> failurePortNames;
    private final AsynchronousCommitTracker commitTracker;
    private final StatelessStateManagerProvider stateManagerProvider;
    private volatile boolean canceled = false;
    private volatile ExecutionProgress.CompletionAction completionAction = null;
    private final BlockingQueue<ExecutionProgress.CompletionAction> completionActionQueue = new LinkedBlockingQueue();

    public StandardExecutionProgress(ProcessGroup processGroup, List<FlowFileQueue> list, BlockingQueue<TriggerResult> blockingQueue, ByteArrayContentRepository byteArrayContentRepository, Set<String> set, AsynchronousCommitTracker asynchronousCommitTracker, StatelessStateManagerProvider statelessStateManagerProvider) {
        this.rootGroup = processGroup;
        this.internalFlowFileQueues = list;
        this.resultQueue = blockingQueue;
        this.contentRepository = byteArrayContentRepository;
        this.failurePortNames = set;
        this.commitTracker = asynchronousCommitTracker;
        this.stateManagerProvider = statelessStateManagerProvider;
    }

    @Override // org.apache.nifi.stateless.engine.ExecutionProgress
    public boolean isCanceled() {
        return this.canceled;
    }

    @Override // org.apache.nifi.stateless.engine.ExecutionProgress
    public boolean isDataQueued() {
        Iterator<FlowFileQueue> it = this.internalFlowFileQueues.iterator();
        while (it.hasNext()) {
            if (!it.next().isActiveQueueEmpty()) {
                return true;
            }
        }
        return false;
    }

    @Override // org.apache.nifi.stateless.engine.ExecutionProgress
    public ExecutionProgress.CompletionAction awaitCompletionAction() throws InterruptedException {
        if (this.canceled) {
            return ExecutionProgress.CompletionAction.CANCEL;
        }
        ExecutionProgress.CompletionAction completionAction = this.completionAction;
        if (completionAction != null) {
            return completionAction;
        }
        this.resultQueue.offer(createResult());
        ExecutionProgress.CompletionAction take = this.completionActionQueue.take();
        this.completionAction = take;
        return take;
    }

    private TriggerResult createResult() {
        final Map<String, List<FlowFile>> drainOutputQueues = drainOutputQueues();
        for (String str : this.failurePortNames) {
            List<FlowFile> list = drainOutputQueues.get(str);
            if (list != null && !list.isEmpty()) {
                throw new FailurePortEncounteredException("FlowFile was transferred to Port " + str + ", which is marked as a Failure Port");
            }
        }
        final boolean isCanceled = isCanceled();
        return new TriggerResult() { // from class: org.apache.nifi.stateless.engine.StandardExecutionProgress.1
            public boolean isSuccessful() {
                return true;
            }

            public boolean isCanceled() {
                return isCanceled;
            }

            public Optional<Throwable> getFailureCause() {
                return Optional.empty();
            }

            public Map<String, List<FlowFile>> getOutputFlowFiles() {
                return drainOutputQueues;
            }

            public List<FlowFile> getOutputFlowFiles(String str2) {
                return (List) drainOutputQueues.computeIfAbsent(str2, str3 -> {
                    return Collections.emptyList();
                });
            }

            public byte[] readContent(FlowFile flowFile) {
                if (!(flowFile instanceof FlowFileRecord)) {
                    throw new IllegalArgumentException("FlowFile was not created by this flow");
                }
                FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile;
                byte[] bytes = StandardExecutionProgress.this.contentRepository.getBytes(flowFileRecord.getContentClaim());
                long contentClaimOffset = flowFileRecord.getContentClaimOffset();
                long size = flowFileRecord.getSize();
                return (contentClaimOffset == 0 && size == ((long) bytes.length)) ? bytes : Arrays.copyOfRange(bytes, (int) contentClaimOffset, (int) (size + contentClaimOffset));
            }

            public void acknowledge() {
                StandardExecutionProgress.this.commitTracker.triggerCallbacks();
                StandardExecutionProgress.this.stateManagerProvider.commitUpdates();
                StandardExecutionProgress.this.completionActionQueue.offer(ExecutionProgress.CompletionAction.COMPLETE);
            }
        };
    }

    @Override // org.apache.nifi.stateless.engine.ExecutionProgress
    public void notifyExecutionCanceled() {
        this.canceled = true;
        this.commitTracker.triggerFailureCallbacks(new RuntimeException("Dataflow Canceled"));
        this.stateManagerProvider.rollbackUpdates();
        this.completionActionQueue.offer(ExecutionProgress.CompletionAction.CANCEL);
    }

    @Override // org.apache.nifi.stateless.engine.ExecutionProgress
    public void notifyExecutionFailed(Throwable th) {
        this.commitTracker.triggerFailureCallbacks(th);
        this.stateManagerProvider.rollbackUpdates();
        this.completionActionQueue.offer(ExecutionProgress.CompletionAction.CANCEL);
    }

    public Map<String, List<FlowFile>> drainOutputQueues() {
        HashMap hashMap = new HashMap();
        for (Port port : this.rootGroup.getOutputPorts()) {
            hashMap.put(port.getName(), drainOutputQueues(port));
        }
        return hashMap;
    }

    private List<FlowFile> drainOutputQueues(Port port) {
        List incomingConnections = port.getIncomingConnections();
        if (incomingConnections.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = incomingConnections.iterator();
        while (it.hasNext()) {
            DrainableFlowFileQueue drainableFlowFileQueue = (DrainableFlowFileQueue) ((Connection) it.next()).getFlowFileQueue();
            ArrayList arrayList2 = new ArrayList(drainableFlowFileQueue.size().getObjectCount());
            drainableFlowFileQueue.drainTo(arrayList2);
            arrayList.addAll(arrayList2);
            Iterator<FlowFileRecord> it2 = arrayList2.iterator();
            while (it2.hasNext()) {
                this.contentRepository.decrementClaimantCount(it2.next().getContentClaim());
            }
        }
        return arrayList;
    }
}
