/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.atlas.provenance.lineage;

import java.util.List;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.NiFiFlow;
import org.apache.nifi.atlas.NiFiFlowPath;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.lineage.AbstractLineageStrategy;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.lineage.ComputeLineageResult;
import org.apache.nifi.provenance.lineage.LineageEdge;
import org.apache.nifi.provenance.lineage.LineageNode;
import org.apache.nifi.provenance.lineage.LineageNodeType;

public class SimpleFlowPathLineage
extends AbstractLineageStrategy {
    @Override
    public void processEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event) {
        DataSetRefs refs = this.executeAnalyzer(analysisContext, event);
        if (refs == null || refs.isEmpty()) {
            return;
        }
        if ("Remote Input Port".equals(event.getComponentType()) || "Remote Output Port".equals(event.getComponentType())) {
            this.processRemotePortEvent(analysisContext, nifiFlow, event, refs);
        } else {
            this.addDataSetRefs(nifiFlow, refs);
        }
    }

    private void processRemotePortEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
        boolean isRemoteInputPort = "Remote Input Port".equals(event.getComponentType());
        Referenceable remotePortDataSet = isRemoteInputPort ? analyzedRefs.getOutputs().iterator().next() : analyzedRefs.getInputs().iterator().next();
        String portProcessId = event.getComponentId();
        NiFiFlowPath remotePortProcess = new NiFiFlowPath(portProcessId);
        remotePortProcess.setName(event.getComponentType());
        remotePortProcess.addProcessor(portProcessId);
        if (isRemoteInputPort) {
            ProvenanceEventRecord previousEvent = this.findPreviousProvenanceEvent(analysisContext, event);
            if (previousEvent == null) {
                this.logger.warn("Previous event was not found: {}", new Object[]{event});
                return;
            }
            List<ConnectionStatus> incomingConnections = nifiFlow.getIncomingConnections(portProcessId);
            if (incomingConnections == null || incomingConnections.isEmpty()) {
                this.logger.warn("Incoming relationship was not found: {}", new Object[]{event});
                return;
            }
            ConnectionStatus connection = incomingConnections.get(0);
            remotePortProcess.setGroupId(connection.getGroupId());
            Referenceable remotePortProcessRef = this.toReferenceable(remotePortProcess, nifiFlow);
            this.createEntity(remotePortProcessRef);
            Referenceable queueFromStaticFlowPathToRemotePortProcess = new Referenceable("nifi_queue", new String[0]);
            queueFromStaticFlowPathToRemotePortProcess.set("name", (Object)"queue");
            queueFromStaticFlowPathToRemotePortProcess.set("qualifiedName", (Object)nifiFlow.toQualifiedName(portProcessId));
            DataSetRefs staticFlowPathRefs = new DataSetRefs(previousEvent.getComponentId());
            staticFlowPathRefs.addOutput(queueFromStaticFlowPathToRemotePortProcess);
            this.addDataSetRefs(nifiFlow, staticFlowPathRefs);
            DataSetRefs remotePortRefs = new DataSetRefs(portProcessId);
            remotePortRefs.addInput(queueFromStaticFlowPathToRemotePortProcess);
            remotePortRefs.addOutput(remotePortDataSet);
            this.addDataSetRefs(remotePortRefs, remotePortProcessRef);
        } else {
            List<ConnectionStatus> connections = nifiFlow.getOutgoingConnections(portProcessId);
            if (connections == null || connections.isEmpty()) {
                this.logger.warn("Incoming connection was not found: {}", new Object[]{event});
                return;
            }
            remotePortProcess.setGroupId(connections.get(0).getGroupId());
            Referenceable remotePortProcessRef = this.toReferenceable(remotePortProcess, nifiFlow);
            this.createEntity(remotePortProcessRef);
            DataSetRefs remotePortRefs = new DataSetRefs(portProcessId);
            remotePortRefs.addInput(remotePortDataSet);
            this.addDataSetRefs(remotePortRefs, remotePortProcessRef);
            for (ConnectionStatus connection : connections) {
                String destinationId = connection.getDestinationId();
                NiFiFlowPath destFlowPath = nifiFlow.findPath(destinationId);
                if (destFlowPath == null) continue;
                Referenceable queueFromRemotePortProcessToStaticFlowPath = new Referenceable("nifi_queue", new String[0]);
                queueFromRemotePortProcessToStaticFlowPath.set("name", (Object)"queue");
                queueFromRemotePortProcessToStaticFlowPath.set("qualifiedName", (Object)nifiFlow.toQualifiedName(destinationId));
                DataSetRefs staticFlowPathRefs = new DataSetRefs(destinationId);
                staticFlowPathRefs.addInput(queueFromRemotePortProcessToStaticFlowPath);
                this.addDataSetRefs(nifiFlow, staticFlowPathRefs);
                remotePortRefs.addOutput(queueFromRemotePortProcessToStaticFlowPath);
                this.addDataSetRefs(remotePortRefs, remotePortProcessRef);
            }
            nifiFlow.getFlowPaths().put(remotePortProcess.getId(), remotePortProcess);
        }
    }

    private ProvenanceEventRecord findPreviousProvenanceEvent(AnalysisContext context, ProvenanceEventRecord event) {
        ComputeLineageResult lineage = context.queryLineage(event.getEventId());
        if (lineage == null) {
            this.logger.warn("Lineage was not found: {}", new Object[]{event});
            return null;
        }
        LineageNode previousProvenanceNode = this.traverseLineage(lineage, String.valueOf(event.getEventId()));
        if (previousProvenanceNode == null) {
            this.logger.warn("Traverse lineage could not find any preceding provenance event node: {}", new Object[]{event});
            return null;
        }
        long previousEventId = Long.parseLong(previousProvenanceNode.getIdentifier());
        return context.getProvenanceEvent(previousEventId);
    }

    private LineageNode traverseLineage(ComputeLineageResult lineage, String eventId) {
        LineageNode previousNode = lineage.getEdges().stream().filter(edge -> edge.getDestination().getIdentifier().equals(String.valueOf(eventId))).findFirst().map(LineageEdge::getSource).orElse(null);
        if (previousNode == null) {
            return null;
        }
        if (previousNode.getNodeType().equals((Object)LineageNodeType.PROVENANCE_EVENT_NODE)) {
            return previousNode;
        }
        return this.traverseLineage(lineage, previousNode.getIdentifier());
    }
}

