package org.apache.nifi.processors.aws.kinesis.stream.record;

import com.amazonaws.services.kinesis.model.Record;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StopWatch;

/* loaded from: input_file:org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.class */
public class KinesisRecordProcessorRecord extends AbstractKinesisRecordProcessor {
    final RecordReaderFactory readerFactory;
    final RecordSetWriterFactory writerFactory;
    final Map<String, String> schemaRetrievalVariables;
    private RecordSetWriter writer;
    private OutputStream outputStream;

    public KinesisRecordProcessorRecord(ProcessSessionFactory processSessionFactory, ComponentLog componentLog, String str, String str2, String str3, long j, long j2, int i, DateTimeFormatter dateTimeFormatter, RecordReaderFactory recordReaderFactory, RecordSetWriterFactory recordSetWriterFactory) {
        super(processSessionFactory, componentLog, str, str2, str3, j, j2, i, dateTimeFormatter);
        this.readerFactory = recordReaderFactory;
        this.writerFactory = recordSetWriterFactory;
        this.schemaRetrievalVariables = Collections.singletonMap(AbstractKinesisRecordProcessor.KINESIS_RECORD_SCHEMA_KEY, str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.nifi.processors.aws.kinesis.stream.record.AbstractKinesisRecordProcessor
    public void startProcessingRecords() {
        super.startProcessingRecords();
        this.outputStream = null;
        this.writer = null;
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.nifi.processors.aws.kinesis.stream.record.AbstractKinesisRecordProcessor
    void processRecord(List<FlowFile> list, Record record, boolean z, ProcessSession processSession, StopWatch stopWatch) {
        boolean z2 = true;
        int i = 0;
        byte[] array = record.getData() != null ? record.getData().array() : new byte[0];
        FlowFile flowFile = null;
        try {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(array);
            Throwable th = null;
            try {
                RecordReader createRecordReader = this.readerFactory.createRecordReader(this.schemaRetrievalVariables, byteArrayInputStream, array.length, getLogger());
                Throwable th2 = null;
                try {
                    try {
                        PushBackRecordSet pushBackRecordSet = new PushBackRecordSet(createRecordReader.createRecordSet());
                        while (true) {
                            org.apache.nifi.serialization.record.Record next = pushBackRecordSet.next();
                            if (next == null) {
                                break;
                            }
                            if (list.isEmpty()) {
                                flowFile = processSession.create();
                                list.add(flowFile);
                                createWriter(flowFile, processSession, next);
                            }
                            WriteResult write = this.writer.write(next);
                            i += write.getRecordCount();
                            if (z && !pushBackRecordSet.isAnotherRecord()) {
                                completeFlowFile(list, processSession, i, write, record, stopWatch);
                            }
                            z2 = false;
                        }
                        if (createRecordReader != null) {
                            if (0 != 0) {
                                try {
                                    createRecordReader.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                createRecordReader.close();
                            }
                        }
                        if (byteArrayInputStream != null) {
                            if (0 != 0) {
                                try {
                                    byteArrayInputStream.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                byteArrayInputStream.close();
                            }
                        }
                    } catch (Throwable th5) {
                        th2 = th5;
                        throw th5;
                    }
                } catch (Throwable th6) {
                    if (createRecordReader != null) {
                        if (th2 != null) {
                            try {
                                createRecordReader.close();
                            } catch (Throwable th7) {
                                th2.addSuppressed(th7);
                            }
                        } else {
                            createRecordReader.close();
                        }
                    }
                    throw th6;
                }
            } catch (Throwable th8) {
                if (byteArrayInputStream != null) {
                    if (0 != 0) {
                        try {
                            byteArrayInputStream.close();
                        } catch (Throwable th9) {
                            th.addSuppressed(th9);
                        }
                    } else {
                        byteArrayInputStream.close();
                    }
                }
                throw th8;
            }
        } catch (MalformedRecordException | IOException | SchemaNotFoundException e) {
            getLogger().error("Failed to parse message from Kinesis Stream using configured Record Reader and Writer due to {}", new Object[]{e.getLocalizedMessage(), e});
            outputRawRecordOnException(z2, flowFile, list, processSession, array, record, e);
        }
        if (getLogger().isDebugEnabled()) {
            getLogger().debug("Sequence No: {}, Partition Key: {}, Data: {}", new Object[]{record.getSequenceNumber(), record.getPartitionKey(), BASE_64_ENCODER.encodeToString(array)});
        }
    }

    private void createWriter(FlowFile flowFile, ProcessSession processSession, org.apache.nifi.serialization.record.Record record) throws IOException, SchemaNotFoundException {
        RecordSchema schema = this.writerFactory.getSchema(this.schemaRetrievalVariables, record.getSchema());
        this.outputStream = processSession.write(flowFile);
        this.writer = this.writerFactory.createWriter(getLogger(), schema, this.outputStream, flowFile);
        this.writer.beginRecordSet();
    }

    private void completeFlowFile(List<FlowFile> list, ProcessSession processSession, int i, WriteResult writeResult, Record record, StopWatch stopWatch) throws IOException {
        try {
            try {
                this.writer.finishRecordSet();
                try {
                    this.writer.close();
                    this.outputStream.close();
                } catch (IOException e) {
                    getLogger().warn("Failed to close Record Writer due to {}", new Object[]{e.getLocalizedMessage(), e});
                }
                reportProvenance(processSession, list.get(0), null, null, stopWatch);
                Map<String, String> defaultAttributes = getDefaultAttributes(record);
                defaultAttributes.put("record.count", String.valueOf(i));
                defaultAttributes.put(CoreAttributes.MIME_TYPE.key(), this.writer.getMimeType());
                defaultAttributes.putAll(writeResult.getAttributes());
                list.set(0, processSession.putAllAttributes(list.get(0), defaultAttributes));
                this.writer = null;
                this.outputStream = null;
            } catch (Throwable th) {
                try {
                    this.writer.close();
                    this.outputStream.close();
                } catch (IOException e2) {
                    getLogger().warn("Failed to close Record Writer due to {}", new Object[]{e2.getLocalizedMessage(), e2});
                }
                throw th;
            }
        } catch (IOException e3) {
            getLogger().error("Failed to finish record output due to {}", new Object[]{e3.getLocalizedMessage(), e3});
            processSession.remove(list.get(0));
            list.remove(0);
            throw e3;
        }
    }

    private void outputRawRecordOnException(boolean z, FlowFile flowFile, List<FlowFile> list, ProcessSession processSession, byte[] bArr, Record record, Exception exc) {
        if (z && flowFile != null) {
            processSession.remove(flowFile);
            list.remove(0);
            if (this.writer != null) {
                try {
                    this.writer.close();
                    this.outputStream.close();
                } catch (IOException e) {
                    getLogger().warn("Failed to close Record Writer due to {}", new Object[]{e.getLocalizedMessage(), e});
                }
            }
        }
        FlowFile create = processSession.create();
        processSession.write(create, outputStream -> {
            outputStream.write(bArr);
        });
        Map<String, String> defaultAttributes = getDefaultAttributes(record);
        Throwable cause = exc.getCause() != null ? exc.getCause() : exc;
        defaultAttributes.put("record.error.message", cause.getLocalizedMessage() != null ? cause.getLocalizedMessage() : cause.getClass().getCanonicalName() + " Thrown");
        transferTo(ConsumeKinesisStream.REL_PARSE_FAILURE, processSession, 0, 0, Collections.singletonList(processSession.putAllAttributes(create, defaultAttributes)));
    }

    private Map<String, String> getDefaultAttributes(Record record) {
        return getDefaultAttributes(record.getSequenceNumber(), record.getPartitionKey(), record.getApproximateArrivalTimestamp());
    }
}
