package org.apache.nifi.python.processor;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.nifi.NullSuppression;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.json.JsonRecordSource;
import org.apache.nifi.json.JsonSchemaInference;
import org.apache.nifi.json.JsonTreeRowRecordReader;
import org.apache.nifi.json.OutputGrouping;
import org.apache.nifi.json.WriteJsonResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.schema.access.NopSchemaAccessWriter;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schema.inference.TimeValueInference;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;

@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
/* loaded from: input_file:org/apache/nifi/python/processor/RecordTransformProxy.class */
public class RecordTransformProxy extends PythonProcessorProxy {
    private volatile RecordTransform transform;
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("Record Reader").displayName("Record Reader").description("Specifies the Controller Service to use for reading incoming data").required(true).identifiesControllerService(RecordReaderFactory.class).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("Record Writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/python/processor/RecordTransformProxy$DestinationTuple.class */
    public static final class DestinationTuple extends Record {
        private final FlowFile flowFile;
        private final RecordSetWriter writer;

        private DestinationTuple(FlowFile flowFile, RecordSetWriter recordSetWriter) {
            this.flowFile = flowFile;
            this.writer = recordSetWriter;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, DestinationTuple.class), DestinationTuple.class, "flowFile;writer", "FIELD:Lorg/apache/nifi/python/processor/RecordTransformProxy$DestinationTuple;->flowFile:Lorg/apache/nifi/flowfile/FlowFile;", "FIELD:Lorg/apache/nifi/python/processor/RecordTransformProxy$DestinationTuple;->writer:Lorg/apache/nifi/serialization/RecordSetWriter;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, DestinationTuple.class), DestinationTuple.class, "flowFile;writer", "FIELD:Lorg/apache/nifi/python/processor/RecordTransformProxy$DestinationTuple;->flowFile:Lorg/apache/nifi/flowfile/FlowFile;", "FIELD:Lorg/apache/nifi/python/processor/RecordTransformProxy$DestinationTuple;->writer:Lorg/apache/nifi/serialization/RecordSetWriter;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, DestinationTuple.class, Object.class), DestinationTuple.class, "flowFile;writer", "FIELD:Lorg/apache/nifi/python/processor/RecordTransformProxy$DestinationTuple;->flowFile:Lorg/apache/nifi/flowfile/FlowFile;", "FIELD:Lorg/apache/nifi/python/processor/RecordTransformProxy$DestinationTuple;->writer:Lorg/apache/nifi/serialization/RecordSetWriter;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public FlowFile flowFile() {
            return this.flowFile;
        }

        public RecordSetWriter writer() {
            return this.writer;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/python/processor/RecordTransformProxy$RecordGroupingKey.class */
    public static final class RecordGroupingKey extends Record {
        private final String relationship;
        private final Map<String, Object> partition;

        private RecordGroupingKey(String str, Map<String, Object> map) {
            this.relationship = str;
            this.partition = map;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, RecordGroupingKey.class), RecordGroupingKey.class, "relationship;partition", "FIELD:Lorg/apache/nifi/python/processor/RecordTransformProxy$RecordGroupingKey;->relationship:Ljava/lang/String;", "FIELD:Lorg/apache/nifi/python/processor/RecordTransformProxy$RecordGroupingKey;->partition:Ljava/util/Map;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, RecordGroupingKey.class), RecordGroupingKey.class, "relationship;partition", "FIELD:Lorg/apache/nifi/python/processor/RecordTransformProxy$RecordGroupingKey;->relationship:Ljava/lang/String;", "FIELD:Lorg/apache/nifi/python/processor/RecordTransformProxy$RecordGroupingKey;->partition:Ljava/util/Map;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, RecordGroupingKey.class, Object.class), RecordGroupingKey.class, "relationship;partition", "FIELD:Lorg/apache/nifi/python/processor/RecordTransformProxy$RecordGroupingKey;->relationship:Ljava/lang/String;", "FIELD:Lorg/apache/nifi/python/processor/RecordTransformProxy$RecordGroupingKey;->partition:Ljava/util/Map;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public String relationship() {
            return this.relationship;
        }

        public Map<String, Object> partition() {
            return this.partition;
        }
    }

    public RecordTransformProxy(String str, Supplier<PythonProcessorBridge> supplier) {
        super(str, supplier);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.nifi.python.processor.PythonProcessorProxy
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(RECORD_READER);
        arrayList.add(RECORD_WRITER);
        arrayList.addAll(super.getSupportedPropertyDescriptors());
        return arrayList;
    }

    @OnScheduled
    public void setProcessContext(ProcessContext processContext) {
        Optional processorAdapter = getBridge().orElseThrow(() -> {
            return new IllegalStateException(String.valueOf(this) + " is not finished initializing");
        }).getProcessorAdapter();
        if (processorAdapter.isEmpty()) {
            throw new IllegalStateException(String.valueOf(this) + " is not finished initializing");
        }
        this.transform = (RecordTransform) ((PythonProcessorAdapter) processorAdapter.get()).getProcessor();
        this.transform.setContext(processContext);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        RecordReaderFactory asControllerService = processContext.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordSetWriterFactory recordSetWriterFactory = (RecordSetWriterFactory) processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        HashMap hashMap = new HashMap();
        FlowFileAttributeMap flowFileAttributeMap = new FlowFileAttributeMap(flowFile);
        long j = 0;
        long j2 = 0;
        try {
            InputStream read = processSession.read(flowFile);
            try {
                RecordReader createRecordReader = asControllerService.createRecordReader(flowFile, read, getLogger());
                try {
                    RecordSchema schema = createRecordReader.getSchema();
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    try {
                        WriteJsonResult writeJsonResult = new WriteJsonResult(getLogger(), schema, new NopSchemaAccessWriter(), byteArrayOutputStream, false, NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, (String) null, (String) null, (String) null);
                        int i = 0;
                        while (true) {
                            Record nextRecord = createRecordReader.nextRecord();
                            if (nextRecord == null) {
                                break;
                            }
                            j++;
                            if (i == 0) {
                                writeJsonResult.beginRecordSet();
                            }
                            writeJsonResult.writeRawRecord(nextRecord);
                            i++;
                            if (byteArrayOutputStream.size() >= 1000000) {
                                writeJsonResult.finishRecordSet();
                                writeJsonResult.flush();
                                String byteArrayOutputStream2 = byteArrayOutputStream.toString();
                                byteArrayOutputStream.reset();
                                Iterator<RecordTransformResult> it = this.transform.transformRecord(byteArrayOutputStream2, schema, flowFileAttributeMap).iterator();
                                while (it.hasNext()) {
                                    writeResult(it.next(), hashMap, recordSetWriterFactory, processSession, flowFile);
                                    j2++;
                                }
                                i = 0;
                            }
                        }
                        if (i > 0) {
                            writeJsonResult.finishRecordSet();
                            writeJsonResult.flush();
                            String byteArrayOutputStream3 = byteArrayOutputStream.toString();
                            byteArrayOutputStream.reset();
                            Iterator<RecordTransformResult> it2 = this.transform.transformRecord(byteArrayOutputStream3, schema, flowFileAttributeMap).iterator();
                            while (it2.hasNext()) {
                                writeResult(it2.next(), hashMap, recordSetWriterFactory, processSession, flowFile);
                                j2++;
                            }
                        }
                        byteArrayOutputStream.close();
                        Map<Relationship, List<FlowFile>> mapResults = mapResults(hashMap, processSession);
                        processSession.adjustCounter("Records Read", j, false);
                        processSession.adjustCounter("Record Written", j2, false);
                        if (createRecordReader != null) {
                            createRecordReader.close();
                        }
                        if (read != null) {
                            read.close();
                        }
                        mapResults.forEach((relationship, list) -> {
                            processSession.transfer(list, relationship);
                        });
                        processSession.transfer(flowFile, REL_ORIGINAL);
                    } catch (Throwable th) {
                        try {
                            byteArrayOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (createRecordReader != null) {
                        try {
                            createRecordReader.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e) {
            getLogger().error("Failed to transform {}; routing to failure", new Object[]{flowFile, e});
            processSession.transfer(flowFile, REL_FAILURE);
            hashMap.values().forEach(destinationTuple -> {
                processSession.remove(destinationTuple.flowFile());
                try {
                    destinationTuple.writer().close();
                } catch (IOException e2) {
                    getLogger().warn("Failed to close Record Writer for FlowFile created in this session", e2);
                }
            });
        }
    }

    private Map<Relationship, List<FlowFile>> mapResults(Map<RecordGroupingKey, DestinationTuple> map, ProcessSession processSession) throws IOException {
        HashMap hashMap = new HashMap();
        for (Map.Entry<RecordGroupingKey, DestinationTuple> entry : map.entrySet()) {
            DestinationTuple value = entry.getValue();
            RecordSetWriter writer = value.writer();
            WriteResult finishRecordSet = writer.finishRecordSet();
            writer.close();
            HashMap hashMap2 = new HashMap(finishRecordSet.getAttributes());
            hashMap2.put("record.count", String.valueOf(finishRecordSet.getRecordCount()));
            hashMap2.put("mime.type", writer.getMimeType());
            RecordGroupingKey key = entry.getKey();
            Map<String, Object> partition = key.partition();
            if (partition != null) {
                partition.forEach((str, obj) -> {
                    hashMap2.put(str, Objects.toString(obj));
                });
            }
            ((List) hashMap.computeIfAbsent(new Relationship.Builder().name(key.relationship()).build(), relationship -> {
                return new ArrayList();
            })).add(processSession.putAllAttributes(value.flowFile(), hashMap2));
        }
        return hashMap;
    }

    private void writeResult(RecordTransformResult recordTransformResult, Map<RecordGroupingKey, DestinationTuple> map, RecordSetWriterFactory recordSetWriterFactory, ProcessSession processSession, FlowFile flowFile) throws SchemaNotFoundException, IOException, MalformedRecordException {
        Record createRecordFromJson = createRecordFromJson(recordTransformResult);
        if (createRecordFromJson == null) {
            getLogger().debug("Received null result from RecordTransform; will not write result to output for {}", new Object[]{flowFile});
            return;
        }
        RecordGroupingKey recordGroupingKey = new RecordGroupingKey(recordTransformResult.getRelationship(), recordTransformResult.getPartition());
        DestinationTuple destinationTuple = map.get(recordGroupingKey);
        if (destinationTuple == null) {
            FlowFile create = processSession.create(flowFile);
            try {
                OutputStream write = processSession.write(create);
                Map attributes = flowFile.getAttributes();
                RecordSetWriter createWriter = recordSetWriterFactory.createWriter(getLogger(), recordSetWriterFactory.getSchema(attributes, createRecordFromJson.getSchema()), write, attributes);
                createWriter.beginRecordSet();
                destinationTuple = new DestinationTuple(create, createWriter);
                map.put(recordGroupingKey, destinationTuple);
            } catch (Exception e) {
                processSession.remove(create);
                throw e;
            }
        }
        destinationTuple.writer().write(createRecordFromJson);
    }

    private Record createRecordFromJson(RecordTransformResult recordTransformResult) throws IOException, MalformedRecordException {
        byte[] bytes = recordTransformResult.getRecordJson().getBytes(StandardCharsets.UTF_8);
        RecordSchema schema = recordTransformResult.getSchema();
        RecordSchema inferSchema = schema == null ? inferSchema(bytes) : schema;
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        try {
            Record nextRecord = new JsonTreeRowRecordReader(byteArrayInputStream, getLogger(), inferSchema, (String) null, (String) null, (String) null).nextRecord(false, false);
            byteArrayInputStream.close();
            return nextRecord;
        } catch (Throwable th) {
            try {
                byteArrayInputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private RecordSchema inferSchema(byte[] bArr) throws IOException {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
        try {
            RecordSchema inferSchema = new JsonSchemaInference(new TimeValueInference((String) null, (String) null, (String) null)).inferSchema(new JsonRecordSource(byteArrayInputStream));
            byteArrayInputStream.close();
            return inferSchema;
        } catch (Throwable th) {
            try {
                byteArrayInputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
