/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.script;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.script.Bindings;
import javax.script.Compilable;
import javax.script.CompiledScript;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import javax.script.SimpleBindings;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.script.ScriptingComponentHelper;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.search.SearchContext;
import org.apache.nifi.search.SearchResult;
import org.apache.nifi.search.Searchable;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;

@EventDriven
@SupportsBatching
@SideEffectFree
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"record", "transform", "script", "groovy", "jython", "python", "update", "modify", "filter"})
@Restricted(restrictions={@Restriction(requiredPermission=RequiredPermission.EXECUTE_CODE, explanation="Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")})
@WritesAttributes(value={@WritesAttribute(attribute="mime.type", description="Sets the mime.type attribute to the MIME Type specified by the Record Writer"), @WritesAttribute(attribute="record.count", description="The number of records in the FlowFile"), @WritesAttribute(attribute="record.error.message", description="This attribute provides on failure the error message encountered by the Reader or Writer.")})
@CapabilityDescription(value="Provides the ability to evaluate a simple script against each record in an incoming FlowFile. The script may transform the record in some way, filter the record, or fork additional records. See Processor's Additional Details for more information.")
@SeeAlso(classNames={"org.apache.nifi.processors.script.ExecuteScript", "org.apache.nifi.processors.standard.UpdateRecord", "org.apache.nifi.processors.standard.QueryRecord", "org.apache.nifi.processors.jolt.record.JoltTransformRecord", "org.apache.nifi.processors.standard.LookupRecord"})
public class ScriptedTransformRecord
extends AbstractProcessor
implements Searchable {
    private static final String PYTHON_SCRIPT_LANGUAGE = "python";
    private static final Set<String> SCRIPT_OPTIONS = ScriptingComponentUtils.getAvailableEngines();
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("Record Reader").displayName("Record Reader").description("The Record Reader to use parsing the incoming FlowFile into Records").required(true).identifiesControllerService(RecordReaderFactory.class).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("Record Writer").displayName("Record Writer").description("The Record Writer to use for serializing Records after they have been transformed").required(true).identifiesControllerService(RecordSetWriterFactory.class).build();
    static final PropertyDescriptor LANGUAGE = new PropertyDescriptor.Builder().name("Script Engine").displayName("Script Language").description("The Language to use for the script").allowableValues(SCRIPT_OPTIONS).defaultValue("Groovy").required(true).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Each FlowFile that were successfully transformed will be routed to this Relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that cannot be transformed will be routed to this Relationship").build();
    private volatile String scriptToRun = null;
    private final AtomicReference<CompiledScript> compiledScriptRef = new AtomicReference();
    private final ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper();
    private final List<PropertyDescriptor> descriptors = Arrays.asList(RECORD_READER, RECORD_WRITER, LANGUAGE, ScriptingComponentUtils.SCRIPT_BODY, ScriptingComponentUtils.SCRIPT_FILE, ScriptingComponentUtils.MODULES);

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        return Collections.unmodifiableSet(relationships);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        return this.scriptingComponentHelper.customValidate(validationContext);
    }

    @OnScheduled
    public void setup(ProcessContext context) throws IOException {
        if (!this.scriptingComponentHelper.isInitialized.get()) {
            this.scriptingComponentHelper.createResources();
        }
        this.scriptingComponentHelper.setupVariables((PropertyContext)context);
        int maxTasks = context.getMaxConcurrentTasks();
        this.scriptingComponentHelper.setup(maxTasks, this.getLogger());
        this.scriptToRun = this.scriptingComponentHelper.getScriptBody();
        if (this.scriptToRun == null && this.scriptingComponentHelper.getScriptPath() != null) {
            try (FileInputStream scriptStream = new FileInputStream(this.scriptingComponentHelper.getScriptPath());){
                this.scriptToRun = IOUtils.toString((InputStream)scriptStream, (Charset)Charset.defaultCharset());
            }
        }
        this.compiledScriptRef.set(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        ScriptEngine scriptEngine = (ScriptEngine)this.scriptingComponentHelper.engineQ.poll();
        if (scriptEngine == null) {
            session.rollback();
            return;
        }
        try {
            ScriptEvaluator evaluator;
            try {
                evaluator = this.createEvaluator(scriptEngine, flowFile);
            }
            catch (ScriptException se) {
                this.getLogger().error("Failed to initialize script engine", (Throwable)se);
                session.transfer(flowFile, REL_FAILURE);
                this.scriptingComponentHelper.engineQ.offer(scriptEngine);
                return;
            }
            this.transform(flowFile, evaluator, context, session);
        }
        finally {
            this.scriptingComponentHelper.engineQ.offer(scriptEngine);
        }
    }

    private void transform(final FlowFile flowFile, ScriptEvaluator evaluator, ProcessContext context, ProcessSession session) {
        long startMillis = System.currentTimeMillis();
        RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        final RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        Counts counts = new Counts();
        try {
            HashMap attributesToAdd = new HashMap();
            session.write(flowFile, (in, out) -> {
                final AtomicReference writerReference = new AtomicReference();
                try (RecordReader reader = readerFactory.createRecordReader(flowFile, in, this.getLogger());){
                    WriteResult writeResult;
                    RecordWriteAction writeAction = new RecordWriteAction(){
                        private RecordSetWriter writer = null;

                        @Override
                        public void write(Record record) throws IOException {
                            if (record == null) {
                                return;
                            }
                            record.incorporateInactiveFields();
                            if (this.writer == null) {
                                RecordSchema writerSchema = record.getSchema();
                                try {
                                    this.writer = writerFactory.createWriter(ScriptedTransformRecord.this.getLogger(), writerSchema, out, flowFile);
                                }
                                catch (SchemaNotFoundException e) {
                                    throw new IOException(e);
                                }
                                writerReference.set(this.writer);
                                this.writer.beginRecordSet();
                            }
                            this.writer.write(record);
                        }
                    };
                    try {
                        Record inputRecord;
                        while ((inputRecord = reader.nextRecord()) != null) {
                            this.processRecord(inputRecord, flowFile, counts, writeAction, evaluator);
                        }
                        RecordSetWriter writer = (RecordSetWriter)writerReference.get();
                        if (writer == null) {
                            writer = writerFactory.createWriter(this.getLogger(), reader.getSchema(), out, flowFile);
                            writer.beginRecordSet();
                            writeResult = writer.finishRecordSet();
                            attributesToAdd.put("mime.type", writer.getMimeType());
                        } else {
                            writeResult = writer.finishRecordSet();
                            attributesToAdd.put("mime.type", writer.getMimeType());
                        }
                    }
                    finally {
                        RecordSetWriter writer = (RecordSetWriter)writerReference.get();
                        if (writer != null) {
                            writer.close();
                        }
                    }
                    attributesToAdd.putAll(writeResult.getAttributes());
                    attributesToAdd.put("record.count", String.valueOf(writeResult.getRecordCount()));
                }
                catch (ScriptException | SchemaNotFoundException | MalformedRecordException e) {
                    throw new ProcessException(e);
                }
            });
            session.putAllAttributes(flowFile, attributesToAdd);
            session.transfer(flowFile, REL_SUCCESS);
            long transformCount = counts.getRecordCount() - counts.getDroppedCount();
            this.getLogger().info("Successfully transformed {} Records and dropped {} Records for {}", new Object[]{transformCount, counts.getDroppedCount(), flowFile});
            session.adjustCounter("Records Transformed", transformCount, true);
            session.adjustCounter("Records Dropped", counts.getDroppedCount(), true);
            long millis = System.currentTimeMillis() - startMillis;
            session.getProvenanceReporter().modifyContent(flowFile, "Transformed " + transformCount + " Records, Dropped " + counts.getDroppedCount() + " Records", millis);
        }
        catch (ProcessException e) {
            this.getLogger().error("After processing {} Records, encountered failure when attempting to transform {}", new Object[]{counts.getRecordCount(), flowFile}, e.getCause());
            session.transfer(flowFile, REL_FAILURE);
        }
        catch (Exception e) {
            this.getLogger().error("After processing {} Records, encountered failure when attempting to transform {}", new Object[]{counts.getRecordCount(), flowFile}, (Throwable)e);
            session.transfer(flowFile, REL_FAILURE);
        }
    }

    private void processRecord(Record inputRecord, FlowFile flowFile, Counts counts, RecordWriteAction recordWriteAction, ScriptEvaluator evaluator) throws IOException, ScriptException {
        long index = counts.getRecordCount();
        Object returnValue = evaluator.evaluate(inputRecord, index);
        counts.incrementRecordCount();
        if (returnValue == null) {
            this.getLogger().trace("Script returned null for Record {} [{}] so will drop Record from {}", new Object[]{index, inputRecord, flowFile});
            counts.incrementDroppedCount();
            return;
        }
        if (returnValue instanceof Record) {
            Record transformedRecord = (Record)returnValue;
            this.getLogger().trace("Successfully transformed Record {} from {} to {} for {}", new Object[]{index, inputRecord, transformedRecord, flowFile});
            recordWriteAction.write(transformedRecord);
            return;
        }
        if (returnValue instanceof Collection) {
            Collection collection = (Collection)returnValue;
            this.getLogger().trace("Successfully transformed Record {} from {} to {} for {}", new Object[]{index, inputRecord, collection, flowFile});
            for (Object element : collection) {
                if (!(element instanceof Record)) {
                    throw new RuntimeException("Evaluated script against Record number " + index + " of " + flowFile + " but instead of returning a Record or Collection of Records, script returned a Collection of values, at least one of which was not a Record but instead was: " + returnValue);
                }
                recordWriteAction.write((Record)element);
            }
            return;
        }
        throw new RuntimeException("Evaluated script against Record number " + index + " of " + flowFile + " but instead of returning a Record, script returned a value of: " + returnValue);
    }

    private ScriptEvaluator createEvaluator(ScriptEngine scriptEngine, FlowFile flowFile) throws ScriptException {
        if (PYTHON_SCRIPT_LANGUAGE.equalsIgnoreCase(scriptEngine.getFactory().getLanguageName())) {
            CompiledScript compiledScript = this.getOrCompileScript((Compilable)((Object)scriptEngine), this.scriptToRun);
            return new PythonScriptEvaluator(scriptEngine, compiledScript, flowFile);
        }
        return new InterpretedScriptEvaluator(scriptEngine, this.scriptToRun, flowFile);
    }

    private CompiledScript getOrCompileScript(Compilable scriptEngine, String scriptToRun) throws ScriptException {
        CompiledScript existing = this.compiledScriptRef.get();
        if (existing != null) {
            return existing;
        }
        CompiledScript compiled = scriptEngine.compile(scriptToRun);
        boolean updated = this.compiledScriptRef.compareAndSet(null, compiled);
        if (updated) {
            return compiled;
        }
        return this.compiledScriptRef.get();
    }

    private static Bindings setupBindings(ScriptEngine scriptEngine) {
        Bindings bindings = scriptEngine.getBindings(100);
        if (bindings == null) {
            bindings = new SimpleBindings();
        }
        scriptEngine.setBindings(bindings, 100);
        return bindings;
    }

    public Collection<SearchResult> search(SearchContext context) {
        return ScriptingComponentUtils.search(context, this.getLogger());
    }

    private static interface RecordWriteAction {
        public void write(Record var1) throws IOException;
    }

    private static class Counts {
        private long recordCount;
        private long droppedCount;

        private Counts() {
        }

        public long getRecordCount() {
            return this.recordCount;
        }

        public long getDroppedCount() {
            return this.droppedCount;
        }

        public void incrementRecordCount() {
            ++this.recordCount;
        }

        public void incrementDroppedCount() {
            ++this.droppedCount;
        }
    }

    private class InterpretedScriptEvaluator
    implements ScriptEvaluator {
        private final ScriptEngine scriptEngine;
        private final String scriptToRun;
        private final Bindings bindings;

        public InterpretedScriptEvaluator(ScriptEngine scriptEngine, String scriptToRun, FlowFile flowFile) {
            this.scriptEngine = scriptEngine;
            this.scriptToRun = scriptToRun;
            this.bindings = ScriptedTransformRecord.setupBindings(scriptEngine);
            this.bindings.put("attributes", (Object)flowFile.getAttributes());
            this.bindings.put("log", (Object)ScriptedTransformRecord.this.getLogger());
        }

        @Override
        public Object evaluate(Record record, long index) throws ScriptException {
            this.bindings.put("record", (Object)record);
            this.bindings.put("recordIndex", (Object)index);
            return this.scriptEngine.eval(this.scriptToRun, this.bindings);
        }
    }

    private class PythonScriptEvaluator
    implements ScriptEvaluator {
        private final ScriptEngine scriptEngine;
        private final CompiledScript compiledScript;
        private final Bindings bindings;

        public PythonScriptEvaluator(ScriptEngine scriptEngine, CompiledScript compiledScript, FlowFile flowFile) {
            this.compiledScript = compiledScript;
            this.scriptEngine = scriptEngine;
            this.bindings = ScriptedTransformRecord.setupBindings(scriptEngine);
            this.bindings.put("attributes", (Object)flowFile.getAttributes());
            this.bindings.put("log", (Object)ScriptedTransformRecord.this.getLogger());
        }

        @Override
        public Object evaluate(Record record, long index) throws ScriptException {
            this.bindings.put("record", (Object)record);
            this.bindings.put("recordIndex", (Object)index);
            this.compiledScript.eval(this.bindings);
            return this.scriptEngine.get("_");
        }
    }

    private static interface ScriptEvaluator {
        public Object evaluate(Record var1, long var2) throws ScriptException;
    }
}

