package org.apache.nifi.processors.elasticsearch;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticsearchException;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;

@CapabilityDescription("Elasticsearch get processor that uses the official Elastic REST client libraries to fetch a single document from Elasticsearch by _id. Note that the full body of the document will be read into memory before being written to a FlowFile for transfer.")
@DynamicProperty(name = "The name of a URL query parameter to add", value = "The value of the URL query parameter", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing.")
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "elasticsearch8", "put", "index", "record"})
@WritesAttributes({@WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"), @WritesAttribute(attribute = "elasticsearch.index", description = "The Elasticsearch index containing the document"), @WritesAttribute(attribute = "elasticsearch.type", description = "The Elasticsearch document type"), @WritesAttribute(attribute = "elasticsearch.get.error", description = "The error message provided by Elasticsearch if there is an error fetching the document.")})
/* loaded from: input_file:org/apache/nifi/processors/elasticsearch/GetElasticsearch.class */
public class GetElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
    public static final String VERIFICATION_STEP_DOCUMENT_EXISTS = "Elasticsearch Document Exists";
    private final ObjectMapper mapper = new ObjectMapper();
    private final AtomicReference<ElasticSearchClientService> clientService = new AtomicReference<>(null);
    static final AllowableValue FLOWFILE_CONTENT = new AllowableValue("flowfile-content", "FlowFile Content", "Output the retrieved document as the FlowFile content.");
    static final AllowableValue FLOWFILE_ATTRIBUTE = new AllowableValue("flowfile-attribute", "FlowFile Attribute", "Output the retrieved document as a FlowFile attribute specified by the Attribute Name.");
    public static final PropertyDescriptor ID = new PropertyDescriptor.Builder().name("get-es-id").displayName("Document Id").description("The _id of the document to retrieve.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder().name("get-es-destination").displayName("Destination").description("Indicates whether the retrieved document is written to the FlowFile content or a FlowFile attribute.").required(true).allowableValues(new AllowableValue[]{FLOWFILE_CONTENT, FLOWFILE_ATTRIBUTE}).defaultValue(FLOWFILE_CONTENT.getValue()).build();
    static final PropertyDescriptor ATTRIBUTE_NAME = new PropertyDescriptor.Builder().name("get-es-attribute-name").displayName("Attribute Name").description("The name of the FlowFile attribute to use for the retrieved document output.").required(true).defaultValue("elasticsearch.doc").dependsOn(DESTINATION, new AllowableValue[]{FLOWFILE_ATTRIBUTE}).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final Relationship REL_DOC = new Relationship.Builder().name("document").description("Fetched documents are routed to this relationship.").build();
    static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not_found").description("A FlowFile is routed to this relationship if the specified document does not exist in the Elasticsearch cluster.").build();
    static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(ID, INDEX, TYPE, DESTINATION, ATTRIBUTE_NAME, CLIENT_SERVICE));
    static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet(Arrays.asList(REL_DOC, REL_FAILURE, REL_RETRY, REL_NOT_FOUND)));

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return DESCRIPTORS;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String str) {
        return new PropertyDescriptor.Builder().name(str).required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dynamic(true).build();
    }

    @Override // org.apache.nifi.processors.elasticsearch.ElasticsearchRestProcessor
    public boolean isIndexNotExistSuccessful() {
        return false;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        this.clientService.set(processContext.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
    }

    @OnStopped
    public void onStopped() {
        this.clientService.set(null);
    }

    @Override // org.apache.nifi.processors.elasticsearch.ElasticsearchRestProcessor
    public List<ConfigVerificationResult> verifyAfterIndex(ProcessContext processContext, ComponentLog componentLog, Map<String, String> map, ElasticSearchClientService elasticSearchClientService, String str, boolean z) {
        ArrayList arrayList = new ArrayList();
        ConfigVerificationResult.Builder verificationStepName = new ConfigVerificationResult.Builder().verificationStepName(VERIFICATION_STEP_DOCUMENT_EXISTS);
        if (z && processContext.getProperty(ID).isSet()) {
            String value = processContext.getProperty(TYPE).evaluateAttributeExpressions(map).getValue();
            String value2 = processContext.getProperty(ID).evaluateAttributeExpressions(map).getValue();
            try {
                HashMap hashMap = new HashMap(getUrlQueryParameters(processContext, map));
                hashMap.putIfAbsent("_source", "false");
                elasticSearchClientService.get(str, value, value2, hashMap);
                verificationStepName.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(String.format("Document [%s] exists in index [%s]", value2, str));
            } catch (ElasticsearchException e) {
                if (e.isNotFound()) {
                    verificationStepName.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(String.format("Document [%s] does not exist in index [%s]", value2, str));
                } else {
                    handleDocumentExistsCheckException(e, verificationStepName, componentLog, str, value2);
                }
            } catch (Exception e2) {
                handleDocumentExistsCheckException(e2, verificationStepName, componentLog, str, value2);
            }
        } else {
            verificationStepName.outcome(ConfigVerificationResult.Outcome.SKIPPED).explanation(z ? String.format("No %s specified for document existence check", ID.getDisplayName()) : String.format("Index %s does not exist for document existence check", str));
        }
        arrayList.add(verificationStepName.build());
        return arrayList;
    }

    private void handleDocumentExistsCheckException(Exception exc, ConfigVerificationResult.Builder builder, ComponentLog componentLog, String str, String str2) {
        componentLog.error("Error checking whether document [{}] exists in index [{}]", new Object[]{str2, str, exc});
        builder.outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("Failed to check whether document [%s] exists in index [%s]", str2, str));
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) {
        FlowFile flowFile = processSession.get();
        String value = processContext.getProperty(ID).evaluateAttributeExpressions(flowFile).getValue();
        String value2 = processContext.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
        String value3 = processContext.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
        String value4 = processContext.getProperty(DESTINATION).getValue();
        String value5 = processContext.getProperty(ATTRIBUTE_NAME).evaluateAttributeExpressions(flowFile).getValue();
        try {
            if (StringUtils.isBlank(value)) {
                throw new ProcessException(ID.getDisplayName() + " is blank (after evaluating attribute expressions), cannot GET document");
            }
            StopWatch stopWatch = new StopWatch(true);
            Map map = this.clientService.get().get(value2, value3, value, getUrlQueryParameters(processContext, flowFile));
            HashMap hashMap = new HashMap(4, 1.0f);
            hashMap.put("filename", value);
            hashMap.put("elasticsearch.index", value2);
            if (value3 != null) {
                hashMap.put("elasticsearch.type", value3);
            }
            String writeValueAsString = this.mapper.writeValueAsString(map);
            FlowFile create = flowFile != null ? flowFile : processSession.create();
            if (FLOWFILE_CONTENT.getValue().equals(value4)) {
                create = processSession.write(create, outputStream -> {
                    outputStream.write(writeValueAsString.getBytes());
                });
            } else {
                hashMap.put(value5, writeValueAsString);
            }
            FlowFile putAllAttributes = processSession.putAllAttributes(create, hashMap);
            processSession.getProvenanceReporter().receive(putAllAttributes, this.clientService.get().getTransitUrl(value2, value3), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
            processSession.transfer(putAllAttributes, REL_DOC);
        } catch (Exception e) {
            getLogger().error("Could not fetch document.", e);
            if (flowFile != null) {
                processSession.transfer(processSession.putAttribute(flowFile, "elasticsearch.get.error", e.getMessage()), REL_FAILURE);
            }
            processContext.yield();
        } catch (ElasticsearchException e2) {
            handleElasticsearchException(e2, flowFile, processSession, value2, value3, value);
        }
    }

    private void handleElasticsearchException(ElasticsearchException elasticsearchException, FlowFile flowFile, ProcessSession processSession, String str, String str2, String str3) {
        if (elasticsearchException.isNotFound()) {
            if (flowFile != null) {
                processSession.transfer(flowFile, REL_NOT_FOUND);
                return;
            } else {
                getLogger().warn("Document with _id {} not found in index {} (and type {})", new Object[]{str3, str, str2});
                return;
            }
        }
        Object[] objArr = new Object[1];
        objArr[0] = elasticsearchException.isElastic() ? "Routing to retry." : "Routing to failure";
        getLogger().error(String.format("Encountered a server-side problem with Elasticsearch. %s", objArr), elasticsearchException);
        if (flowFile != null) {
            processSession.penalize(flowFile);
            processSession.transfer(processSession.putAttribute(flowFile, "elasticsearch.get.error", elasticsearchException.getMessage()), elasticsearchException.isElastic() ? REL_RETRY : REL_FAILURE);
        }
    }
}
