/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.jolt;

import com.bazaarvoice.jolt.ContextualTransform;
import com.bazaarvoice.jolt.JoltTransform;
import com.bazaarvoice.jolt.Transform;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.jolt.AbstractJoltTransform;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StopWatch;

@SideEffectFree
@SupportsBatching
@Tags(value={"record", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr", "cardinality", "sort"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttributes(value={@WritesAttribute(attribute="record.count", description="The number of records in an outgoing FlowFile"), @WritesAttribute(attribute="mime.type", description="The MIME Type that the configured Record Writer indicates is appropriate")})
@CapabilityDescription(value="Applies a JOLT specification to each record in the FlowFile payload. A new FlowFile is created with transformed content and is routed to the 'success' relationship. If the transform fails, the original FlowFile is routed to the 'failure' relationship.")
@RequiresInstanceClassLoading
public class JoltTransformRecord
extends AbstractJoltTransform {
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("jolt-record-record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("jolt-record-record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("The FlowFile with transformed content will be routed to this relationship").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile fails processing for any reason (for example, the FlowFile records cannot be parsed), it will be routed to this relationship").build();
    static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile that was transformed. If the FlowFile fails processing, nothing will be sent to this relationship").build();
    private static final List<PropertyDescriptor> PROPERTIES;
    private static final Set<Relationship> RELATIONSHIPS;

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile transformed;
        FlowFile original;
        block51: {
            original = session.get();
            if (original == null) {
                return;
            }
            ComponentLog logger = this.getLogger();
            StopWatch stopWatch = new StopWatch(true);
            RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
            RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
            transformed = null;
            try (InputStream in = session.read(original);
                 RecordReader reader = readerFactory.createRecordReader(original, in, this.getLogger());){
                RecordSchema schema = writerFactory.getSchema(original.getAttributes(), reader.getSchema());
                HashMap<String, String> attributes = new HashMap<String, String>();
                transformed = session.create(original);
                Record firstRecord = reader.nextRecord();
                if (firstRecord == null) {
                    try (OutputStream out = session.write(transformed);
                         RecordSetWriter writer = writerFactory.createWriter(this.getLogger(), schema, out, transformed);){
                        writer.beginRecordSet();
                        WriteResult writeResult = writer.finishRecordSet();
                        attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
                        attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                        attributes.putAll(writeResult.getAttributes());
                    }
                    transformed = session.putAllAttributes(transformed, attributes);
                    logger.info("{} had no Records to transform", new Object[]{original});
                    break block51;
                }
                JoltTransform transform = this.getTransform(context, original);
                List<Record> transformedFirstRecords = this.transform(firstRecord, transform);
                if (transformedFirstRecords.isEmpty()) {
                    throw new ProcessException("Error transforming the first record");
                }
                Record transformedFirstRecord = transformedFirstRecords.get(0);
                if (transformedFirstRecord == null) {
                    throw new ProcessException("Error transforming the first record");
                }
                RecordSchema writeSchema = writerFactory.getSchema(original.getAttributes(), transformedFirstRecord.getSchema());
                try (OutputStream out = session.write(transformed);
                     RecordSetWriter writer = writerFactory.createWriter(this.getLogger(), writeSchema, out, transformed);){
                    Record record;
                    writer.beginRecordSet();
                    writer.write(transformedFirstRecord);
                    for (int i = 1; i < transformedFirstRecords.size(); ++i) {
                        record = transformedFirstRecords.get(i);
                        if (record == null) {
                            throw new ProcessException("Error transforming the first record");
                        }
                        writer.write(record);
                    }
                    while ((record = reader.nextRecord()) != null) {
                        List<Record> transformedRecords = this.transform(record, transform);
                        for (Record transformedRecord : transformedRecords) {
                            writer.write(transformedRecord);
                        }
                    }
                    WriteResult writeResult = writer.finishRecordSet();
                    try {
                        writer.close();
                    }
                    catch (IOException ioe) {
                        this.getLogger().warn("Failed to close Writer for {}", new Object[]{transformed});
                    }
                    attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
                    attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                    attributes.putAll(writeResult.getAttributes());
                }
                String transformType = context.getProperty(JOLT_TRANSFORM).getValue();
                transformed = session.putAllAttributes(transformed, attributes);
                session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                logger.debug("Transform completed {}", new Object[]{original});
            }
            catch (Exception e) {
                logger.error("Transform failed for {}", new Object[]{original, e});
                session.transfer(original, REL_FAILURE);
                if (transformed != null) {
                    session.remove(transformed);
                }
                return;
            }
        }
        if (transformed != null) {
            session.transfer(transformed, REL_SUCCESS);
        }
        session.transfer(original, REL_ORIGINAL);
    }

    private List<Record> transform(Record record, JoltTransform transform) {
        Map recordMap = (Map)DataTypeUtils.convertRecordFieldtoObject((Object)record, (DataType)RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
        recordMap = (Map)JoltTransformRecord.normalizeJoltObjects(recordMap);
        Object transformedObject = JoltTransformRecord.transform(transform, recordMap);
        Object normalizedRecordValues = JoltTransformRecord.normalizeRecordObjects(transformedObject);
        ArrayList<Record> recordList = new ArrayList<Record>();
        if (normalizedRecordValues == null) {
            return recordList;
        }
        if (normalizedRecordValues instanceof Object[]) {
            for (Object o : (Object[])normalizedRecordValues) {
                if (o == null) continue;
                recordList.add(DataTypeUtils.toRecord((Object)o, (String)"r"));
            }
        } else {
            recordList.add(DataTypeUtils.toRecord((Object)normalizedRecordValues, (String)"r"));
        }
        return recordList;
    }

    protected static Object transform(JoltTransform joltTransform, Object input) {
        return joltTransform instanceof ContextualTransform ? ((ContextualTransform)joltTransform).transform(input, Collections.emptyMap()) : ((Transform)joltTransform).transform(input);
    }

    protected static Object normalizeJoltObjects(Object o) {
        if (o instanceof Map) {
            Map m = (Map)o;
            m.forEach((k, v) -> m.put(k, JoltTransformRecord.normalizeJoltObjects(v)));
            return m;
        }
        if (o instanceof Object[]) {
            return Arrays.stream((Object[])o).map(JoltTransformRecord::normalizeJoltObjects).collect(Collectors.toList());
        }
        if (o instanceof Collection) {
            Collection c = (Collection)o;
            return c.stream().map(JoltTransformRecord::normalizeJoltObjects).collect(Collectors.toList());
        }
        return o;
    }

    protected static Object normalizeRecordObjects(Object o) {
        if (o instanceof Map) {
            Map m = (Map)o;
            m.forEach((k, v) -> m.put(k, JoltTransformRecord.normalizeRecordObjects(v)));
            return m;
        }
        if (o instanceof List) {
            List objectList = (List)o;
            Object[] objectArray = new Object[objectList.size()];
            for (int i = 0; i < objectArray.length; ++i) {
                objectArray[i] = JoltTransformRecord.normalizeRecordObjects(objectList.get(i));
            }
            return objectArray;
        }
        if (o instanceof Collection) {
            Collection c = (Collection)o;
            ArrayList<Object> objectList = new ArrayList<Object>();
            for (Object obj : c) {
                objectList.add(JoltTransformRecord.normalizeRecordObjects(obj));
            }
            return objectList;
        }
        return o;
    }

    static {
        ArrayList<PropertyDescriptor> tmp = new ArrayList<PropertyDescriptor>(AbstractJoltTransform.PROPERTIES);
        tmp.add(RECORD_READER);
        tmp.add(RECORD_WRITER);
        PROPERTIES = Collections.unmodifiableList(tmp);
        RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE, REL_ORIGINAL);
    }
}

