/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.elasticsearch;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.elasticsearch.SearchResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.elasticsearch.AbstractPaginatedJsonQueryElasticsearch;
import org.apache.nifi.processors.elasticsearch.ElasticsearchRestProcessor;
import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
import org.apache.nifi.util.StringUtils;

@WritesAttributes(value={@WritesAttribute(attribute="mime.type", description="application/json"), @WritesAttribute(attribute="aggregation.name", description="The name of the aggregation whose results are in the output flowfile"), @WritesAttribute(attribute="aggregation.number", description="The number of the aggregation whose results are in the output flowfile"), @WritesAttribute(attribute="page.number", description="The number of the page (request) in which the results were returned that are in the output flowfile"), @WritesAttribute(attribute="hit.count", description="The number of hits that are in the output flowfile"), @WritesAttribute(attribute="elasticsearch.query.error", description="The error message provided by Elasticsearch if there is an error querying the index.")})
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerSerially
@PrimaryNodeOnly
@DefaultSchedule(period="1 min")
@Tags(value={"elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "query", "scroll", "page", "search", "json"})
@CapabilityDescription(value="A processor that allows the user to repeatedly run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. Search After/Point in Time queries must include a valid \"sort\" field. The processor will retrieve multiple pages of results until either no more results are available or the Pagination Keep Alive expiration is reached, after which the query will restart with the first page of results being retrieved.")
@DynamicProperty(name="The name of a URL query parameter to add", value="The value of the URL query parameter", expressionLanguageScope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description="Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. These parameters will override any matching parameters in the query request body")
@Stateful(scopes={Scope.LOCAL}, description="The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, pageExpirationTimestamp) is retained in between invocations of this processor until the Scroll/PiT has expired (when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
@SystemResourceConsideration(resource=SystemResource.MEMORY, description="Care should be taken on the size of each page because each response from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
public class SearchElasticsearch
extends AbstractPaginatedJsonQueryElasticsearch {
    static final String STATE_SCROLL_ID = "scrollId";
    static final String STATE_PIT_ID = "pitId";
    static final String STATE_SEARCH_AFTER = "searchAfter";
    static final String STATE_PAGE_EXPIRATION_TIMESTAMP = "pageExpirationTimestamp";
    static final String STATE_PAGE_COUNT = "pageCount";
    static final String STATE_HIT_COUNT = "hitCount";
    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder().fromPropertyDescriptor(ElasticsearchRestProcessor.QUERY).name("el-rest-query").description("A query in JSON syntax, not Lucene syntax. Ex: {\"query\":{\"match\":{\"somefield\":\"somevalue\"}}}.").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(true).build();
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;

    @Override
    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    @Override
    PaginatedJsonQueryParameters buildJsonQueryParameters(FlowFile input, ProcessContext context, ProcessSession session) throws IOException {
        PaginatedJsonQueryParameters paginatedQueryJsonParameters = super.buildJsonQueryParameters(input, context, session);
        StateMap stateMap = context.getStateManager().getState(Scope.LOCAL);
        paginatedQueryJsonParameters.setHitCount(stateMap.get(STATE_HIT_COUNT) == null ? 0 : Integer.parseInt(stateMap.get(STATE_HIT_COUNT)));
        paginatedQueryJsonParameters.setPageCount(stateMap.get(STATE_PAGE_COUNT) == null ? 0 : Integer.parseInt(stateMap.get(STATE_PAGE_COUNT)));
        paginatedQueryJsonParameters.setScrollId(stateMap.get(STATE_SCROLL_ID));
        paginatedQueryJsonParameters.setSearchAfter(stateMap.get(STATE_SEARCH_AFTER));
        paginatedQueryJsonParameters.setPitId(stateMap.get(STATE_PIT_ID));
        paginatedQueryJsonParameters.setPageExpirationTimestamp(stateMap.get(STATE_PAGE_EXPIRATION_TIMESTAMP));
        return paginatedQueryJsonParameters;
    }

    @Override
    void finishQuery(FlowFile input, PaginatedJsonQueryParameters paginatedQueryJsonParameters, ProcessSession session, ProcessContext context, SearchResponse response) throws IOException {
        if (response.getHits().isEmpty()) {
            this.getLogger().debug("No more results for paginated query, resetting local state for future queries");
            this.resetProcessorState(context);
        } else {
            this.getLogger().debug("Updating local state for next execution");
            HashMap<String, String> newStateMap = new HashMap<String, String>();
            if (PAGINATION_SCROLL.getValue().equals(this.paginationType)) {
                newStateMap.put(STATE_SCROLL_ID, response.getScrollId());
            } else {
                newStateMap.put(STATE_SEARCH_AFTER, response.getSearchAfter());
                if (PAGINATION_POINT_IN_TIME.getValue().equals(this.paginationType)) {
                    newStateMap.put(STATE_PIT_ID, response.getPitId());
                }
            }
            newStateMap.put(STATE_HIT_COUNT, Integer.toString(paginatedQueryJsonParameters.getHitCount()));
            newStateMap.put(STATE_PAGE_COUNT, Integer.toString(paginatedQueryJsonParameters.getPageCount()));
            newStateMap.put(STATE_PAGE_EXPIRATION_TIMESTAMP, paginatedQueryJsonParameters.getPageExpirationTimestamp());
            context.getStateManager().setState(newStateMap, Scope.LOCAL);
        }
    }

    @Override
    boolean isExpired(PaginatedJsonQueryParameters paginatedJsonQueryParameters, ProcessContext context, SearchResponse response) throws IOException {
        boolean expiredQuery;
        boolean bl = expiredQuery = StringUtils.isNotEmpty((String)paginatedJsonQueryParameters.getPageExpirationTimestamp()) && Instant.ofEpochMilli(Long.parseLong(paginatedJsonQueryParameters.getPageExpirationTimestamp())).isBefore(Instant.now());
        if (expiredQuery) {
            this.getLogger().debug("Existing paginated query has expired, resetting for new query");
            this.resetProcessorState(context);
            paginatedJsonQueryParameters.setPageCount(0);
            paginatedJsonQueryParameters.setHitCount(0);
            paginatedJsonQueryParameters.setPageExpirationTimestamp(null);
            paginatedJsonQueryParameters.setPitId(null);
            paginatedJsonQueryParameters.setScrollId(null);
            paginatedJsonQueryParameters.setSearchAfter(null);
        }
        return expiredQuery;
    }

    @Override
    String getScrollId(ProcessContext context, SearchResponse response) throws IOException {
        return response == null || StringUtils.isBlank((String)response.getScrollId()) ? context.getStateManager().getState(Scope.LOCAL).get(STATE_SCROLL_ID) : response.getScrollId();
    }

    @Override
    String getPitId(ProcessContext context, SearchResponse response) throws IOException {
        return response == null || StringUtils.isBlank((String)response.getScrollId()) ? context.getStateManager().getState(Scope.LOCAL).get(STATE_PIT_ID) : response.getPitId();
    }

    private void resetProcessorState(ProcessContext context) throws IOException {
        context.getStateManager().clear(Scope.LOCAL);
    }

    static {
        HashSet<Relationship> rels = new HashSet<Relationship>();
        rels.add(REL_HITS);
        rels.add(REL_AGGREGATIONS);
        relationships = Collections.unmodifiableSet(rels);
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(QUERY);
        descriptors.addAll(paginatedPropertyDescriptors);
        propertyDescriptors = Collections.unmodifiableList(descriptors);
    }
}

