package org.apache.nifi.processors.elasticsearch;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@CapabilityDescription("Retrieves a document from Elasticsearch using the specified connection properties and the identifier of the document to retrieve. Note that the full body of the document will be read into memory before being written to a Flow File for transfer.")
@DynamicProperty(name = "A URL query parameter", value = "The value to set it to", expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing")
@SupportsBatching
@WritesAttributes({@WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"), @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type")})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@EventDriven
@Tags({"elasticsearch", "fetch", "read", "get", "http"})
/* loaded from: input_file:org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.class */
public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are read from Elasticsearch are routed to this relationship.").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming flow files will be routed to failure.").build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may succeed. Note that if the processor has no incoming connections, flow files may still be sent to this relationship based on the processor properties and the results of the fetch operation.").build();
    public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found").description("A FlowFile is routed to this relationship if the specified document does not exist in the Elasticsearch cluster. Note that if the processor has no incoming connections, flow files may still be sent to this relationship based on the processor properties and the results of the fetch operation.").build();
    public static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder().name("fetch-es-doc-id").displayName("Document Identifier").description("The identifier of the document to be fetched").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder().name("fetch-es-index").displayName("Index").description("The name of the index to read from.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder().name("fetch-es-type").displayName("Type").description("The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty, the first document matching the identifier across all types will be retrieved.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder().name("fetch-es-fields").displayName("Fields").description("A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, then the entire document's source will be retrieved.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    @Override // org.apache.nifi.processors.elasticsearch.AbstractElasticsearchProcessor
    @OnScheduled
    public void setup(ProcessContext processContext) {
        super.setup(processContext);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = null;
        if (processContext.hasIncomingConnection()) {
            flowFile = processSession.get();
            if (flowFile == null && processContext.hasNonLoopConnection()) {
                return;
            }
        }
        OkHttpClient client = getClient();
        if (flowFile == null) {
            flowFile = processSession.create();
        }
        String value = processContext.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
        String value2 = processContext.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
        String value3 = processContext.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
        String value4 = processContext.getProperty(FIELDS).isSet() ? processContext.getProperty(FIELDS).evaluateAttributeExpressions(flowFile).getValue() : null;
        String value5 = processContext.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
        String value6 = processContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
        Charset forName = Charset.forName(processContext.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
        ComponentLog logger = getLogger();
        Response response = null;
        try {
            try {
                try {
                    logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{value, value3, value2});
                    URL buildRequestURL = buildRequestURL(StringUtils.trimToEmpty(processContext.getProperty(ES_URL).evaluateAttributeExpressions().getValue()), value2, value, value3, value4, processContext);
                    long nanoTime = System.nanoTime();
                    Response sendRequestToElasticsearch = sendRequestToElasticsearch(client, buildRequestURL, value5, value6, "GET", null);
                    int code = sendRequestToElasticsearch.code();
                    if (isSuccess(code)) {
                        JsonNode parseJsonResponse = parseJsonResponse(new ByteArrayInputStream(sendRequestToElasticsearch.body().bytes()));
                        boolean asBoolean = parseJsonResponse.get("found").asBoolean(false);
                        String asText = parseJsonResponse.get("_index").asText();
                        String asText2 = parseJsonResponse.get("_type").asText();
                        String asText3 = parseJsonResponse.get("_id").asText();
                        if (asBoolean) {
                            JsonNode jsonNode = parseJsonResponse.get("_source");
                            FlowFile putAttribute = processSession.putAttribute(processSession.putAttribute(processSession.putAttribute(flowFile, "filename", asText3), "es.index", asText), "es.type", asText2);
                            if (jsonNode != null) {
                                putAttribute = processSession.write(putAttribute, outputStream -> {
                                    outputStream.write(jsonNode.toString().getBytes(forName));
                                });
                            }
                            logger.debug("Elasticsearch document " + asText3 + " fetched, routing to success");
                            long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
                            if (processContext.hasNonLoopConnection()) {
                                processSession.getProvenanceReporter().fetch(putAttribute, buildRequestURL.toExternalForm(), millis);
                            } else {
                                processSession.getProvenanceReporter().receive(putAttribute, buildRequestURL.toExternalForm(), millis);
                            }
                            processSession.transfer(putAttribute, REL_SUCCESS);
                        } else {
                            logger.debug("Failed to read {}/{}/{} from Elasticsearch: Document not found", new Object[]{value, value3, value2});
                            processSession.transfer(flowFile, REL_NOT_FOUND);
                        }
                    } else if (code == 404) {
                        logger.warn("Failed to read {}/{}/{} from Elasticsearch: Document not found", new Object[]{value, value3, value2});
                        processSession.transfer(flowFile, REL_NOT_FOUND);
                    } else if (code / 100 == 5) {
                        logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to retry. This is likely a server problem, yielding...", new Object[]{Integer.valueOf(code), sendRequestToElasticsearch.message()});
                        processSession.transfer(flowFile, REL_RETRY);
                        processContext.yield();
                    } else if (processContext.hasIncomingConnection()) {
                        logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{Integer.valueOf(code), sendRequestToElasticsearch.message()});
                        processSession.transfer(flowFile, REL_FAILURE);
                    } else {
                        logger.warn("Elasticsearch returned code {} with message {}", new Object[]{Integer.valueOf(code), sendRequestToElasticsearch.message()});
                        processSession.remove(flowFile);
                    }
                    if (sendRequestToElasticsearch != null) {
                        sendRequestToElasticsearch.close();
                    }
                } catch (Exception e) {
                    logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{flowFile, e.getLocalizedMessage()}, e);
                    if (processContext.hasIncomingConnection()) {
                        processSession.transfer(flowFile, REL_FAILURE);
                    } else {
                        processSession.remove(flowFile);
                    }
                    processContext.yield();
                    if (0 != 0) {
                        response.close();
                    }
                }
            } catch (IOException e2) {
                logger.error("Failed to read from Elasticsearch due to {}, this may indicate an error in configuration (hosts, username/password, etc.). Routing to retry", new Object[]{e2.getLocalizedMessage()}, e2);
                if (processContext.hasIncomingConnection()) {
                    processSession.transfer(flowFile, REL_RETRY);
                } else {
                    processSession.remove(flowFile);
                }
                processContext.yield();
                if (0 != 0) {
                    response.close();
                }
            }
        } catch (Throwable th) {
            if (0 != 0) {
                response.close();
            }
            throw th;
        }
    }

    private URL buildRequestURL(String str, String str2, String str3, String str4, String str5, ProcessContext processContext) throws MalformedURLException {
        if (StringUtils.isEmpty(str)) {
            throw new MalformedURLException("Base URL cannot be null");
        }
        HttpUrl.Builder newBuilder = HttpUrl.parse(str).newBuilder();
        newBuilder.addPathSegment(str3);
        newBuilder.addPathSegment(StringUtils.isEmpty(str4) ? "_all" : str4);
        newBuilder.addPathSegment(str2);
        if (!StringUtils.isEmpty(str5)) {
            newBuilder.addQueryParameter("_source_include", (String) Stream.of((Object[]) str5.split(",")).map((v0) -> {
                return v0.trim();
            }).collect(Collectors.joining(",")));
        }
        for (Map.Entry entry : processContext.getProperties().entrySet()) {
            PropertyDescriptor propertyDescriptor = (PropertyDescriptor) entry.getKey();
            if (propertyDescriptor.isDynamic() && entry.getValue() != null) {
                newBuilder.addQueryParameter(propertyDescriptor.getName(), processContext.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue());
            }
        }
        return newBuilder.build().url();
    }

    static {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        hashSet.add(REL_RETRY);
        hashSet.add(REL_NOT_FOUND);
        relationships = Collections.unmodifiableSet(hashSet);
        ArrayList arrayList = new ArrayList(COMMON_PROPERTY_DESCRIPTORS);
        arrayList.add(DOC_ID);
        arrayList.add(INDEX);
        arrayList.add(TYPE);
        arrayList.add(FIELDS);
        propertyDescriptors = Collections.unmodifiableList(arrayList);
    }
}
