package org.apache.nifi.processors.elasticsearch;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.elasticsearch.SearchResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.JsonValidator;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
import org.apache.nifi.util.StringUtils;

@CapabilityDescription("A processor that repeatedly runs a paginated query against a field using a Range query to consume new Documents from an Elasticsearch index/query. The processor will retrieve multiple pages of results until either no more results are available or the Pagination Keep Alive expiration is reached, after which the Range query will automatically update the field constraint based on the last retrieved Document value.")
@DynamicProperty(name = "The name of a URL query parameter to add", value = "The value of the URL query parameter", expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT, description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. These parameters will override any matching parameters in the query request body. For SCROLL type queries, these parameters are only used in the initial (first page) query as the Elasticsearch Scroll API does not support the same query parameters for subsequent pages of data.")
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
@WritesAttributes({@WritesAttribute(attribute = "mime.type", description = "application/json"), @WritesAttribute(attribute = "page.number", description = "The number of the page (request), starting from 1, in which the results were returned that are in the output flowfile"), @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile"), @WritesAttribute(attribute = "elasticsearch.query.error", description = "The error message provided by Elasticsearch if there is an error querying the index.")})
@DefaultSchedule(period = "1 min")
@PrimaryNodeOnly
@Stateful(scopes = {Scope.CLUSTER}, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, pageExpirationTimestamp, trackingRangeValue) is retained in between invocations of this processor until the Scroll/PiT has expired (when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerSerially
@Tags({"elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "elasticsearch8", "query", "scroll", "page", "search", "json"})
@SeeAlso({SearchElasticsearch.class, PaginatedJsonQueryElasticsearch.class})
/* loaded from: input_file:org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.class */
public class ConsumeElasticsearch extends SearchElasticsearch {
    static final String STATE_RANGE_VALUE = "trackingRangeValue";
    public static final PropertyDescriptor SIZE = new PropertyDescriptor.Builder().fromPropertyDescriptor(ElasticsearchRestProcessor.SIZE).clearDependsOn().build();
    public static final PropertyDescriptor AGGREGATIONS = new PropertyDescriptor.Builder().fromPropertyDescriptor(ElasticsearchRestProcessor.AGGREGATIONS).clearDependsOn().build();
    public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder().fromPropertyDescriptor(ElasticsearchRestProcessor.SORT).clearDependsOn().build();
    public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder().fromPropertyDescriptor(ElasticsearchRestProcessor.FIELDS).clearDependsOn().build();
    public static final PropertyDescriptor SCRIPT_FIELDS = new PropertyDescriptor.Builder().fromPropertyDescriptor(ElasticsearchRestProcessor.SCRIPT_FIELDS).clearDependsOn().build();
    public static final PropertyDescriptor RANGE_FIELD = new PropertyDescriptor.Builder().name("es-rest-range-field").displayName("Range Query Field").description("Field to be tracked as part of an Elasticsearch Range query using a \"gt\" bound match. This field must exist within the Elasticsearch document for it to be retrieved.").addValidator(StandardValidators.NON_BLANK_VALIDATOR).required(true).build();
    public static final PropertyDescriptor RANGE_FIELD_SORT_ORDER = new PropertyDescriptor.Builder().name("es-rest-sort-order").displayName("Sort Order").description("The order in which to sort the \"" + RANGE_FIELD.getDisplayName() + "\". A \"sort\" clause for the \"" + RANGE_FIELD.getDisplayName() + "\" field will be prepended to any provided \"" + SORT.getDisplayName() + "\" clauses. If a \"sort\" clause already exists for the \"" + RANGE_FIELD.getDisplayName() + "\" field, it will not be updated.").allowableValues(new String[]{"asc", "desc"}).defaultValue("asc").required(true).build();
    public static final PropertyDescriptor RANGE_INITIAL_VALUE = new PropertyDescriptor.Builder().name("es-rest-range-initial-value").displayName("Initial Value").description("The initial value to use for the query if the processor has not run previously. If the processor has run previously and stored a value in its state, this property will be ignored. If no value is provided, and the processor has not previously run, no Range query bounds will be used, i.e. all documents will be retrieved in the specified \"" + RANGE_FIELD_SORT_ORDER.getDisplayName() + "\".").addValidator(StandardValidators.NON_BLANK_VALIDATOR).required(false).build();
    public static final PropertyDescriptor RANGE_DATE_FORMAT = new PropertyDescriptor.Builder().name("es-rest-range-format").displayName(RANGE_INITIAL_VALUE.getDisplayName() + " Date Format").description("If the \"" + RANGE_FIELD.getDisplayName() + "\" is a Date field, convert the \"" + RANGE_INITIAL_VALUE.getDisplayName() + "\" to a date with this format. If not specified, Elasticsearch will use the date format provided by the \"" + RANGE_FIELD.getDisplayName() + "\"'s mapping. For valid syntax, see https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html").addValidator(StandardValidators.NON_BLANK_VALIDATOR).dependsOn(RANGE_INITIAL_VALUE, new AllowableValue[0]).required(false).build();
    public static final PropertyDescriptor RANGE_TIME_ZONE = new PropertyDescriptor.Builder().name("es-rest-range-time-zone").displayName(RANGE_INITIAL_VALUE.getDisplayName() + " Date Time Zone").description("If the \"" + RANGE_FIELD.getDisplayName() + "\" is a Date field, convert the \"" + RANGE_INITIAL_VALUE.getDisplayName() + "\" to UTC with this time zone. Valid values are ISO 8601 UTC offsets, such as \"+01:00\" or \"-08:00\", and IANA time zone IDs, such as \"Europe/London\".").addValidator(StandardValidators.NON_BLANK_VALIDATOR).dependsOn(RANGE_INITIAL_VALUE, new AllowableValue[0]).required(false).build();
    public static final PropertyDescriptor ADDITIONAL_FILTERS = new PropertyDescriptor.Builder().name("es-rest-additional-filters").displayName("Additional Filters").description("One or more query filters in JSON syntax, not Lucene syntax. Ex: [{\"match\":{\"somefield\":\"somevalue\"}}, {\"match\":{\"anotherfield\":\"anothervalue\"}}]. These filters wil be used as part of a Bool query's filter.").addValidator(JsonValidator.INSTANCE).required(false).build();
    private static final List<PropertyDescriptor> propertyDescriptors;
    protected String trackingRangeField;
    protected String trackingSortOrder;

    @Override // org.apache.nifi.processors.elasticsearch.SearchElasticsearch, org.apache.nifi.processors.elasticsearch.AbstractJsonQueryElasticsearch
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    @Override // org.apache.nifi.processors.elasticsearch.SearchElasticsearch
    Scope getStateScope() {
        return Scope.CLUSTER;
    }

    @Override // org.apache.nifi.processors.elasticsearch.AbstractPaginatedJsonQueryElasticsearch, org.apache.nifi.processors.elasticsearch.AbstractJsonQueryElasticsearch
    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        super.onScheduled(processContext);
        this.trackingRangeField = processContext.getProperty(RANGE_FIELD).getValue();
        this.trackingSortOrder = processContext.getProperty(RANGE_FIELD_SORT_ORDER).getValue();
    }

    @Override // org.apache.nifi.processors.elasticsearch.AbstractJsonQueryElasticsearch
    @OnStopped
    public void onStopped() {
        super.onStopped();
        this.trackingRangeField = null;
        this.trackingSortOrder = null;
    }

    private String getTrackingRangeField(ProcessContext processContext) {
        return this.trackingRangeField != null ? this.trackingRangeField : processContext != null ? processContext.getProperty(RANGE_FIELD).getValue() : null;
    }

    private String getTrackingSortOrder(ProcessContext processContext) {
        return this.trackingSortOrder != null ? this.trackingSortOrder : processContext != null ? processContext.getProperty(RANGE_FIELD_SORT_ORDER).getValue() : null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.nifi.processors.elasticsearch.SearchElasticsearch, org.apache.nifi.processors.elasticsearch.AbstractPaginatedJsonQueryElasticsearch, org.apache.nifi.processors.elasticsearch.AbstractJsonQueryElasticsearch
    public PaginatedJsonQueryParameters buildJsonQueryParameters(FlowFile flowFile, ProcessContext processContext, ProcessSession processSession) throws IOException {
        PaginatedJsonQueryParameters buildJsonQueryParameters = super.buildJsonQueryParameters(flowFile, processContext, processSession);
        buildJsonQueryParameters.setTrackingRangeValue(getTrackingRangeValueOrDefault(processContext));
        return buildJsonQueryParameters;
    }

    @Override // org.apache.nifi.processors.elasticsearch.ElasticsearchRestProcessor
    public void addQueryClause(Map<String, Object> map, Map<String, String> map2, final ProcessContext processContext) throws IOException {
        ArrayList arrayList = new ArrayList(10);
        final String trackingRangeValueOrDefault = getTrackingRangeValueOrDefault(processContext);
        if (StringUtils.isNotBlank(trackingRangeValueOrDefault)) {
            arrayList.add(Collections.singletonMap("range", Collections.singletonMap(getTrackingRangeField(processContext), new HashMap<String, String>(3, 1.0f) { // from class: org.apache.nifi.processors.elasticsearch.ConsumeElasticsearch.1
                {
                    put("gt", trackingRangeValueOrDefault);
                    if (processContext.getProperty(ConsumeElasticsearch.RANGE_DATE_FORMAT).isSet()) {
                        put("format", processContext.getProperty(ConsumeElasticsearch.RANGE_DATE_FORMAT).getValue());
                    }
                    if (processContext.getProperty(ConsumeElasticsearch.RANGE_TIME_ZONE).isSet()) {
                        put("time_zone", processContext.getProperty(ConsumeElasticsearch.RANGE_TIME_ZONE).getValue());
                    }
                }
            })));
        }
        if (processContext.getProperty(ADDITIONAL_FILTERS).isSet()) {
            JsonNode readTree = mapper.readTree(processContext.getProperty(ADDITIONAL_FILTERS).getValue());
            if (readTree.isArray()) {
                arrayList.addAll((Collection) mapper.convertValue(readTree, new TypeReference<List<Map<String, Object>>>(this) { // from class: org.apache.nifi.processors.elasticsearch.ConsumeElasticsearch.2
                }));
            } else {
                arrayList.add((Map) mapper.convertValue(readTree, new TypeReference<Map<String, Object>>(this) { // from class: org.apache.nifi.processors.elasticsearch.ConsumeElasticsearch.3
                }));
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        map.put("query", Collections.singletonMap("bool", Collections.singletonMap("filter", arrayList)));
    }

    @Override // org.apache.nifi.processors.elasticsearch.ElasticsearchRestProcessor
    public void addSortClause(Map<String, Object> map, Map<String, String> map2, ProcessContext processContext) throws IOException {
        List arrayList;
        super.addSortClause(map, map2, processContext);
        if (map.containsKey("sort")) {
            arrayList = (List) map.get("sort");
        } else {
            arrayList = new ArrayList(1);
            map.put("sort", arrayList);
        }
        if (arrayList.stream().noneMatch(map3 -> {
            return map3.containsKey(getTrackingRangeField(processContext));
        })) {
            arrayList.add(0, Collections.singletonMap(getTrackingRangeField(processContext), getTrackingSortOrder(processContext)));
        }
    }

    @Override // org.apache.nifi.processors.elasticsearch.SearchElasticsearch
    void additionalState(Map<String, String> map, PaginatedJsonQueryParameters paginatedJsonQueryParameters) {
        map.put(STATE_RANGE_VALUE, paginatedJsonQueryParameters.getTrackingRangeValue());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.nifi.processors.elasticsearch.AbstractPaginatedJsonQueryElasticsearch
    public void updateQueryParameters(PaginatedJsonQueryParameters paginatedJsonQueryParameters, SearchResponse searchResponse) {
        int size;
        super.updateQueryParameters(paginatedJsonQueryParameters, searchResponse);
        if (searchResponse.getHits().isEmpty()) {
            return;
        }
        if ("desc".equals(getTrackingSortOrder(null)) && paginatedJsonQueryParameters.getPageCount() == 1) {
            size = 0;
        } else if (!"asc".equals(getTrackingSortOrder(null))) {
            return;
        } else {
            size = searchResponse.getHits().size() - 1;
        }
        String valueOf = String.valueOf(((Map) ((Map) searchResponse.getHits().get(size)).get("_source")).get(getTrackingRangeField(null)));
        if (StringUtils.isNotBlank(valueOf)) {
            paginatedJsonQueryParameters.setTrackingRangeValue(valueOf);
        }
    }

    private String getTrackingRangeValueOrDefault(ProcessContext processContext) throws IOException {
        StateMap state = processContext.getStateManager().getState(getStateScope());
        return (state == null || state.get(STATE_RANGE_VALUE) == null) ? processContext.getProperty(RANGE_INITIAL_VALUE).getValue() : state.get(STATE_RANGE_VALUE);
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.add(RANGE_FIELD);
        arrayList.add(RANGE_FIELD_SORT_ORDER);
        arrayList.add(RANGE_INITIAL_VALUE);
        arrayList.add(RANGE_DATE_FORMAT);
        arrayList.add(RANGE_TIME_ZONE);
        arrayList.add(ADDITIONAL_FILTERS);
        arrayList.addAll((Collection) scrollPropertyDescriptors.stream().filter(propertyDescriptor -> {
            return (QUERY.equals(propertyDescriptor) || QUERY_CLAUSE.equals(propertyDescriptor) || QUERY_DEFINITION_STYLE.equals(propertyDescriptor)) ? false : true;
        }).collect(Collectors.toList()));
        arrayList.set(arrayList.indexOf(ElasticsearchRestProcessor.SIZE), SIZE);
        arrayList.set(arrayList.indexOf(ElasticsearchRestProcessor.AGGREGATIONS), AGGREGATIONS);
        arrayList.set(arrayList.indexOf(ElasticsearchRestProcessor.SORT), SORT);
        arrayList.set(arrayList.indexOf(ElasticsearchRestProcessor.FIELDS), FIELDS);
        arrayList.set(arrayList.indexOf(ElasticsearchRestProcessor.SCRIPT_FIELDS), SCRIPT_FIELDS);
        propertyDescriptors = Collections.unmodifiableList(arrayList);
    }
}
