package org.apache.nifi.processors.graph;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.graph.GraphClientService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.StringUtils;

@CapabilityDescription("This processor is designed to execute queries in either the Cypher query language or the Tinkerpop Gremlin DSL. It delegates most of the logic to a configured client service that handles the interaction with the remote data source. All of the output is written out as JSON data.")
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@EventDriven
@SupportsBatching
@Tags({"cypher", "neo4j", "graph", "network", "insert", "update", "delete", "put", "get", "node", "relationship", "connection", "executor", "gremlin", "tinkerpop"})
@WritesAttributes({@WritesAttribute(attribute = AbstractGraphExecutor.ERROR_MESSAGE, description = "GraphDB error message"), @WritesAttribute(attribute = "graph.labels.added", description = "Number of labels added"), @WritesAttribute(attribute = "graph.nodes.created", description = "Number of nodes created"), @WritesAttribute(attribute = "graph.nodes.deleted", description = "Number of nodes deleted"), @WritesAttribute(attribute = "graph.properties.set", description = "Number of properties set"), @WritesAttribute(attribute = "graph.relations.created", description = "Number of relationships created"), @WritesAttribute(attribute = "graph.relations.deleted", description = "Number of relationships deleted"), @WritesAttribute(attribute = "graph.rows.returned", description = "Number of rows returned"), @WritesAttribute(attribute = ExecuteGraphQuery.EXECUTION_TIME, description = "The amount of time in milliseconds that the querytook to execute.")})
/* loaded from: input_file:org/apache/nifi/processors/graph/ExecuteGraphQuery.class */
public class ExecuteGraphQuery extends AbstractGraphExecutor {
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;
    public static final String EXECUTION_TIME = "query.took";
    protected ObjectMapper mapper = new ObjectMapper();
    private volatile GraphClientService clientService;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    @Override // org.apache.nifi.processors.graph.AbstractGraphExecutor
    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        super.onScheduled(processContext);
        this.clientService = processContext.getProperty(CLIENT_SERVICE).asControllerService(GraphClientService.class);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        FlowFile create = flowFile != null ? processSession.create(flowFile) : processSession.create();
        try {
            OutputStream write = processSession.write(create);
            Throwable th = null;
            try {
                try {
                    String query = getQuery(processContext, processSession, flowFile);
                    long currentTimeMillis = System.currentTimeMillis();
                    write.write("[".getBytes());
                    Map executeQuery = this.clientService.executeQuery(query, getParameters(processContext, create), (map, z) -> {
                        try {
                            write.write(this.mapper.writeValueAsString(map).getBytes());
                            if (z) {
                                write.write(",".getBytes());
                            }
                        } catch (Exception e) {
                            throw new ProcessException(e);
                        }
                    });
                    write.write("]".getBytes());
                    write.close();
                    String valueOf = String.valueOf(System.currentTimeMillis() - currentTimeMillis);
                    executeQuery.put(EXECUTION_TIME, valueOf);
                    executeQuery.put(CoreAttributes.MIME_TYPE.key(), "application/json");
                    create = processSession.putAllAttributes(create, executeQuery);
                    processSession.transfer(create, REL_SUCCESS);
                    processSession.getProvenanceReporter().invokeRemoteProcess(create, this.clientService.getTransitUrl(), String.format("The following query was executed in %s milliseconds: \"%s\"", valueOf, query));
                    if (flowFile != null) {
                        processSession.transfer(flowFile, REL_ORIGINAL);
                    }
                    if (write != null) {
                        if (0 != 0) {
                            try {
                                write.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            write.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e) {
            getLogger().error("Failed to execute graph statement due to {}", new Object[]{e.getLocalizedMessage()}, e);
            processSession.remove(create);
            if (flowFile != null) {
                processSession.transfer(processSession.putAttribute(flowFile, AbstractGraphExecutor.ERROR_MESSAGE, String.valueOf(e.getMessage())), REL_FAILURE);
            }
            processContext.yield();
        }
    }

    protected String getQuery(ProcessContext processContext, ProcessSession processSession, FlowFile flowFile) {
        String value = processContext.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
        if (StringUtils.isEmpty(value) && flowFile != null) {
            try {
                if (flowFile.getSize() > 65536) {
                    throw new Exception("Input bigger than 64kb. Cannot assume this is a valid query for Gremlin Server or Neo4J.");
                }
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                processSession.exportTo(flowFile, byteArrayOutputStream);
                byteArrayOutputStream.close();
                value = new String(byteArrayOutputStream.toByteArray());
            } catch (Exception e) {
                throw new ProcessException(e);
            }
        }
        return value;
    }

    static {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_ORIGINAL);
        hashSet.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(hashSet);
        ArrayList arrayList = new ArrayList();
        arrayList.add(CLIENT_SERVICE);
        arrayList.add(QUERY);
        propertyDescriptors = Collections.unmodifiableList(arrayList);
    }
}
