package org.apache.nifi.processors.ignite.cache;

import java.io.IOException;
import java.io.InputStream;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;

@CapabilityDescription("Stream the contents of a FlowFile to Ignite Cache using DataStreamer. The processor uses the value of FlowFile attribute (Ignite cache entry key) as the cache key and the byte array of the FlowFile as the value of the cache entry value.  Both the string key and a  non-empty byte array value are required otherwise the FlowFile is transferred to the failure relation. Note - The Ignite Kernel periodically outputs node performance statistics to the logs. This message  can be turned off by setting the log level for logger 'org.apache.ignite' to WARN in the logback.xml configuration file.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@SupportsBatching
@WritesAttributes({@WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, description = "The total number of FlowFile in the batch"), @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, description = "The item number of FlowFile in the batch"), @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, description = "The successful FlowFile item number"), @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, description = "The number of successful FlowFiles"), @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER, description = "The failed FlowFile item number"), @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, description = "The total number of failed FlowFiles in the batch"), @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, description = "The failed reason attribute key")})
@DeprecationNotice(reason = "Apache Ignite 1 is no longer supported")
@EventDriven
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"Ignite", "insert", "update", "stream", "write", "put", "cache", "key"})
@SeeAlso({GetIgniteCache.class})
/* loaded from: input_file:org/apache/nifi/processors/ignite/cache/PutIgniteCache.class */
public class PutIgniteCache extends AbstractIgniteCacheProcessor {
    public static final String IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT = "ignite.cache.batch.flow.file.total.count";
    public static final String IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER = "ignite.cache.batch.flow.file.item.number";
    public static final String IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT = "ignite.cache.batch.flow.file.successful.count";
    public static final String IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER = "ignite.cache.batch.flow.file.successful.number";
    public static final String IGNITE_BATCH_FLOW_FILE_FAILED_COUNT = "ignite.cache.batch.flow.file.failed.count";
    public static final String IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER = "ignite.cache.batch.flow.file.failed.number";
    public static final String IGNITE_BATCH_FLOW_FILE_FAILED_FILE_SIZE = "ignite.cache.batch.flow.file.failed.size";
    public static final String IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY = "ignite.cache.batch.flow.file.failed.reason";
    public static final String IGNITE_BATCH_FLOW_FILE_FAILED_MISSING_KEY_MESSAGE = "The FlowFile key attribute was missing";
    public static final String IGNITE_BATCH_FLOW_FILE_FAILED_ZERO_SIZE_MESSAGE = "The FlowFile size was zero";
    private transient IgniteDataStreamer<String, byte[]> igniteDataStreamer;
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().displayName("Batch Size For Entries").name("batch-size-for-entries").description("Batch size for entries (1-500).").defaultValue("250").required(true).addValidator(StandardValidators.createLongValidator(1, 500, true)).sensitive(false).build();
    public static final PropertyDescriptor DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS = new PropertyDescriptor.Builder().displayName("Data Streamer Per Node Parallel Operations").name("data-streamer-per-node-parallel-operations").description("Data streamer per node parallelism").defaultValue("5").required(true).addValidator(StandardValidators.createLongValidator(1, 10, true)).sensitive(false).build();
    public static final PropertyDescriptor DATA_STREAMER_PER_NODE_BUFFER_SIZE = new PropertyDescriptor.Builder().displayName("Data Streamer Per Node Buffer Size").name("data-streamer-per-node-buffer-size").description("Data streamer per node buffer size (1-500).").defaultValue("250").required(true).addValidator(StandardValidators.createLongValidator(1, 500, true)).sensitive(false).build();
    public static final PropertyDescriptor DATA_STREAMER_AUTO_FLUSH_FREQUENCY = new PropertyDescriptor.Builder().displayName("Data Streamer Auto Flush Frequency in millis").name("data-streamer-auto-flush-frequency-in-millis").description("Data streamer flush interval in millis seconds").defaultValue("10").required(true).addValidator(StandardValidators.createLongValidator(1, 100, true)).sensitive(false).build();
    public static final PropertyDescriptor DATA_STREAMER_ALLOW_OVERRIDE = new PropertyDescriptor.Builder().displayName("Data Streamer Allow Override").name("data-streamer-allow-override").description("Whether to override values already in the cache").defaultValue("false").required(true).allowableValues(new AllowableValue[]{new AllowableValue("true"), new AllowableValue("false")}).sensitive(false).build();
    protected static final List<PropertyDescriptor> descriptors = Arrays.asList(IGNITE_CONFIGURATION_FILE, CACHE_NAME, BATCH_SIZE, IGNITE_CACHE_ENTRY_KEY, DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS, DATA_STREAMER_PER_NODE_BUFFER_SIZE, DATA_STREAMER_AUTO_FLUSH_FREQUENCY, DATA_STREAMER_ALLOW_OVERRIDE);

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnStopped
    public final void closeIgniteDataStreamer() {
        if (this.igniteDataStreamer != null) {
            getLogger().info("Closing ignite data streamer");
            this.igniteDataStreamer.flush();
            this.igniteDataStreamer = null;
        }
    }

    @OnShutdown
    public final void closeIgniteDataStreamerAndCache() {
        closeIgniteDataStreamer();
        super.closeIgniteCache();
    }

    protected IgniteDataStreamer<String, byte[]> getIgniteDataStreamer() {
        return this.igniteDataStreamer;
    }

    @OnScheduled
    public final void initializeIgniteDataStreamer(ProcessContext processContext) throws ProcessException {
        super.initializeIgniteCache(processContext);
        if (getIgniteDataStreamer() != null) {
            return;
        }
        getLogger().info("Creating Ignite Datastreamer");
        try {
            int intValue = processContext.getProperty(DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS).asInteger().intValue();
            int intValue2 = processContext.getProperty(DATA_STREAMER_PER_NODE_BUFFER_SIZE).asInteger().intValue();
            int intValue3 = processContext.getProperty(DATA_STREAMER_AUTO_FLUSH_FREQUENCY).asInteger().intValue();
            boolean booleanValue = processContext.getProperty(DATA_STREAMER_ALLOW_OVERRIDE).asBoolean().booleanValue();
            this.igniteDataStreamer = getIgnite().dataStreamer(getIgniteCache().getName());
            this.igniteDataStreamer.perNodeBufferSize(intValue2);
            this.igniteDataStreamer.perNodeParallelOperations(intValue);
            this.igniteDataStreamer.autoFlushFrequency(intValue3);
            this.igniteDataStreamer.allowOverwrite(booleanValue);
        } catch (Exception e) {
            getLogger().error("Failed to schedule PutIgnite due to {}", new Object[]{e, e});
            throw new ProcessException(e);
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        List<FlowFile> list = processSession.get(processContext.getProperty(BATCH_SIZE).asInteger().intValue());
        if (list.isEmpty()) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (int i = 0; i < list.size(); i++) {
            try {
                FlowFile flowFile = null;
                try {
                    flowFile = list.get(i);
                    String value = processContext.getProperty(IGNITE_CACHE_ENTRY_KEY).evaluateAttributeExpressions(flowFile).getValue();
                    if (isFailedFlowFile(flowFile, value)) {
                        arrayList3.add(flowFile);
                    } else {
                        final byte[] bArr = new byte[(int) flowFile.getSize()];
                        processSession.read(flowFile, new InputStreamCallback() { // from class: org.apache.nifi.processors.ignite.cache.PutIgniteCache.1
                            public void process(InputStream inputStream) throws IOException {
                                StreamUtils.fillBuffer(inputStream, bArr, true);
                            }
                        });
                        arrayList.add(new AbstractMap.SimpleEntry(value, bArr));
                        arrayList2.add(flowFile);
                    }
                } catch (Exception e) {
                    getLogger().error("Failed to insert {} into IgniteDB due to {}", new Object[]{flowFile, e, e});
                    processSession.transfer(flowFile, REL_FAILURE);
                    processContext.yield();
                }
            } catch (Throwable th) {
                if (!arrayList.isEmpty()) {
                    getLogger().debug("Result {} of addData", new Object[]{this.igniteDataStreamer.addData(arrayList).get()});
                }
                if (!arrayList2.isEmpty()) {
                    List<FlowFile> updateSuccessfulFlowFileAttributes = updateSuccessfulFlowFileAttributes(list, arrayList2, processSession);
                    processSession.transfer(updateSuccessfulFlowFileAttributes, REL_SUCCESS);
                    for (FlowFile flowFile2 : updateSuccessfulFlowFileAttributes) {
                        processSession.getProvenanceReporter().send(flowFile2, "ignite://cache/" + getIgniteCache().getName() + "/" + processContext.getProperty(IGNITE_CACHE_ENTRY_KEY).evaluateAttributeExpressions(flowFile2).getValue());
                    }
                }
                if (!arrayList3.isEmpty()) {
                    processSession.transfer(updateFailedFlowFileAttributes(list, arrayList3, processSession, processContext), REL_FAILURE);
                }
                throw th;
            }
        }
        if (!arrayList.isEmpty()) {
            getLogger().debug("Result {} of addData", new Object[]{this.igniteDataStreamer.addData(arrayList).get()});
        }
        if (!arrayList2.isEmpty()) {
            List<FlowFile> updateSuccessfulFlowFileAttributes2 = updateSuccessfulFlowFileAttributes(list, arrayList2, processSession);
            processSession.transfer(updateSuccessfulFlowFileAttributes2, REL_SUCCESS);
            for (FlowFile flowFile3 : updateSuccessfulFlowFileAttributes2) {
                processSession.getProvenanceReporter().send(flowFile3, "ignite://cache/" + getIgniteCache().getName() + "/" + processContext.getProperty(IGNITE_CACHE_ENTRY_KEY).evaluateAttributeExpressions(flowFile3).getValue());
            }
        }
        if (arrayList3.isEmpty()) {
            return;
        }
        processSession.transfer(updateFailedFlowFileAttributes(list, arrayList3, processSession, processContext), REL_FAILURE);
    }

    private boolean isFailedFlowFile(FlowFile flowFile, String str) {
        return StringUtils.isEmpty(str) || flowFile.getSize() == 0;
    }

    protected List<FlowFile> updateSuccessfulFlowFileAttributes(List<FlowFile> list, List<FlowFile> list2, ProcessSession processSession) {
        int size = list.size();
        int size2 = list2.size();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < size2; i++) {
            FlowFile flowFile = list2.get(i);
            HashMap hashMap = new HashMap();
            hashMap.put(IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, Integer.toString(i));
            hashMap.put(IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, Integer.toString(size));
            hashMap.put(IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, Integer.toString(list.indexOf(flowFile)));
            hashMap.put(IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, Integer.toString(size2));
            arrayList.add(processSession.putAllAttributes(flowFile, hashMap));
        }
        return arrayList;
    }

    protected List<FlowFile> updateFailedFlowFileAttributes(List<FlowFile> list, List<FlowFile> list2, ProcessSession processSession, ProcessContext processContext) {
        int size = list.size();
        int size2 = list2.size();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < size2; i++) {
            FlowFile flowFile = list2.get(i);
            HashMap hashMap = new HashMap();
            hashMap.put(IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER, Integer.toString(i));
            hashMap.put(IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, Integer.toString(size));
            hashMap.put(IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, Integer.toString(list.indexOf(flowFile)));
            hashMap.put(IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, Integer.toString(size2));
            if (StringUtils.isEmpty(processContext.getProperty(IGNITE_CACHE_ENTRY_KEY).evaluateAttributeExpressions(flowFile).getValue())) {
                hashMap.put(IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, "The FlowFile key attribute was missing");
            } else {
                if (flowFile.getSize() != 0) {
                    throw new ProcessException("Unknown reason for failing file: " + flowFile);
                }
                hashMap.put(IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, IGNITE_BATCH_FLOW_FILE_FAILED_ZERO_SIZE_MESSAGE);
            }
            arrayList.add(processSession.putAllAttributes(flowFile, hashMap));
        }
        return arrayList;
    }
}
