package org.apache.nifi.processors.elasticsearch;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;

@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as the index to insert into and the type of the document. If the cluster has been configured for authorization and/or secure transport (SSL/TLS) and the Shield plugin is available, secure connections can be made. This processor supports Elasticsearch 2.x clusters.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@EventDriven
@SupportsBatching
@Tags({"elasticsearch", "insert", "update", "write", "put"})
/* loaded from: input_file:org/apache/nifi/processors/elasticsearch/PutElasticsearch.class */
public class PutElasticsearch extends AbstractElasticsearchTransportClientProcessor {
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build();
    static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed").build();
    public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder().name("Identifier Attribute").description("The name of the attribute containing the identifier for each FlowFile").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR).build();
    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder().name("Index").description("The name of the index to insert into").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)).build();
    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder().name("Type").description("The type of this document (used by Elasticsearch for indexing and searching)").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder().name("Index Operation").description("The type of the operation used to index (index, update, upsert)").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).defaultValue("index").build();
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").description("The preferred number of FlowFiles to put to the database in a single transaction").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("100").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    @Override // org.apache.nifi.processors.elasticsearch.AbstractElasticsearchProcessor
    @OnScheduled
    public void setup(ProcessContext processContext) {
        super.setup(processContext);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        BulkItemResponse[] items;
        ComponentLog logger = getLogger();
        String value = processContext.getProperty(ID_ATTRIBUTE).getValue();
        List<FlowFile> list = processSession.get(processContext.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger().intValue());
        if (list.isEmpty()) {
            return;
        }
        LinkedList linkedList = new LinkedList(list);
        try {
            final BulkRequestBuilder prepareBulk = this.esClient.get().prepareBulk();
            if (this.authToken != null) {
                prepareBulk.putHeader("Authorization", this.authToken);
            }
            for (FlowFile flowFile : list) {
                final String value2 = processContext.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
                final String value3 = processContext.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
                final String value4 = processContext.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
                final Charset forName = Charset.forName(processContext.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
                final String attribute = flowFile.getAttribute(value);
                if (attribute == null) {
                    logger.error("No value in identifier attribute {} for {}, transferring to failure", new Object[]{value, flowFile});
                    linkedList.remove(flowFile);
                    processSession.transfer(flowFile, REL_FAILURE);
                } else {
                    processSession.read(flowFile, new InputStreamCallback() { // from class: org.apache.nifi.processors.elasticsearch.PutElasticsearch.1
                        public void process(InputStream inputStream) throws IOException {
                            String replace = IOUtils.toString(inputStream, forName).replace("\r\n", " ").replace('\n', ' ').replace('\r', ' ');
                            if (value4.equalsIgnoreCase("index")) {
                                prepareBulk.add(PutElasticsearch.this.esClient.get().prepareIndex(value2, value3, attribute).setSource(replace.getBytes(forName)));
                            } else if (value4.equalsIgnoreCase("upsert")) {
                                prepareBulk.add(PutElasticsearch.this.esClient.get().prepareUpdate(value2, value3, attribute).setDoc(replace.getBytes(forName)).setDocAsUpsert(true));
                            } else {
                                if (!value4.equalsIgnoreCase("update")) {
                                    throw new IOException("Index operation: " + value4 + " not supported.");
                                }
                                prepareBulk.add(PutElasticsearch.this.esClient.get().prepareUpdate(value2, value3, attribute).setDoc(replace.getBytes(forName)));
                            }
                        }
                    });
                }
            }
            BulkResponse bulkResponse = (BulkResponse) prepareBulk.execute().actionGet();
            if (bulkResponse.hasFailures() && (items = bulkResponse.getItems()) != null && items.length > 0) {
                for (int length = items.length - 1; length >= 0; length--) {
                    FlowFile flowFile2 = (FlowFile) linkedList.get(length);
                    if (items[length].isFailed()) {
                        logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure", new Object[]{flowFile2, items[length].getFailure().getMessage()});
                        processSession.transfer(flowFile2, REL_FAILURE);
                    } else {
                        processSession.getProvenanceReporter().send(flowFile2, processContext.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" + items[length].getIndex());
                        processSession.transfer(flowFile2, REL_SUCCESS);
                    }
                    linkedList.remove(flowFile2);
                }
            }
            linkedList.forEach(flowFile3 -> {
                processSession.transfer(flowFile3, REL_SUCCESS);
                processSession.getProvenanceReporter().send(flowFile3, processContext.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" + processContext.getProperty(INDEX).evaluateAttributeExpressions(flowFile3).getValue());
            });
        } catch (Exception e) {
            logger.error("Failed to insert into Elasticsearch due to {}, transferring to failure", new Object[]{e.getLocalizedMessage()}, e);
            processSession.transfer(linkedList, REL_FAILURE);
            processContext.yield();
        } catch (NoNodeAvailableException | ElasticsearchTimeoutException | ReceiveTimeoutTransportException | NodeClosedException e2) {
            logger.error("Failed to insert into Elasticsearch due to {}. More detailed information may be available in the NiFi logs.", new Object[]{e2.getLocalizedMessage()}, e2);
            processSession.transfer(linkedList, REL_RETRY);
            processContext.yield();
        }
    }

    @Override // org.apache.nifi.processors.elasticsearch.AbstractElasticsearchTransportClientProcessor
    @OnStopped
    public void closeClient() {
        super.closeClient();
    }

    static {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        hashSet.add(REL_RETRY);
        relationships = Collections.unmodifiableSet(hashSet);
        ArrayList arrayList = new ArrayList();
        arrayList.add(CLUSTER_NAME);
        arrayList.add(HOSTS);
        arrayList.add(PROP_SSL_CONTEXT_SERVICE);
        arrayList.add(PROP_SHIELD_LOCATION);
        arrayList.add(USERNAME);
        arrayList.add(PASSWORD);
        arrayList.add(PING_TIMEOUT);
        arrayList.add(SAMPLER_INTERVAL);
        arrayList.add(ID_ATTRIBUTE);
        arrayList.add(INDEX);
        arrayList.add(TYPE);
        arrayList.add(CHARSET);
        arrayList.add(BATCH_SIZE);
        arrayList.add(INDEX_OP);
        propertyDescriptors = Collections.unmodifiableList(arrayList);
    }
}
