package org.apache.nifi.processors.hadoop;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.hadoop.util.ByteFilteringOutputStream;
import org.apache.nifi.processors.hadoop.util.InputStreamWritable;
import org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.class */
public class SequenceFileWriterImpl implements SequenceFileWriter {
    protected static Logger logger = LoggerFactory.getLogger(SequenceFileWriterImpl.class);

    @Override // org.apache.nifi.processors.hadoop.util.SequenceFileWriter
    public FlowFile writeSequenceFile(final FlowFile flowFile, ProcessSession processSession, final Configuration configuration, final SequenceFile.CompressionType compressionType, final CompressionCodec compressionCodec) {
        if (flowFile.getSize() > 2147483647L) {
            throw new IllegalArgumentException("Cannot write " + String.valueOf(flowFile) + "to Sequence File because its size is greater than the largest possible Integer");
        }
        String str = flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf";
        try {
            final byte[] bytes = InputStreamWritable.class.getCanonicalName().getBytes("UTF-8");
            final byte[] bytes2 = BytesWritable.class.getCanonicalName().getBytes("UTF-8");
            final StopWatch stopWatch = new StopWatch(true);
            FlowFile write = processSession.write(flowFile, new StreamCallback() { // from class: org.apache.nifi.processors.hadoop.SequenceFileWriterImpl.1
                public void process(InputStream inputStream, OutputStream outputStream) throws IOException {
                    ByteFilteringOutputStream byteFilteringOutputStream = new ByteFilteringOutputStream(outputStream);
                    byteFilteringOutputStream.addFilter(bytes, bytes2, 1);
                    byteFilteringOutputStream.addFilter((byte) InputStreamWritable.class.getCanonicalName().length(), (byte) BytesWritable.class.getCanonicalName().length(), 1);
                    try {
                        FSDataOutputStream fSDataOutputStream = new FSDataOutputStream(byteFilteringOutputStream, new FileSystem.Statistics(""));
                        try {
                            SequenceFile.Writer createWriter = SequenceFile.createWriter(configuration, new SequenceFile.Writer.Option[]{SequenceFile.Writer.stream(fSDataOutputStream), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(InputStreamWritable.class), SequenceFile.Writer.compression(compressionType, compressionCodec)});
                            try {
                                SequenceFileWriterImpl.this.processInputStream(inputStream, flowFile, createWriter);
                                if (createWriter != null) {
                                    createWriter.close();
                                }
                                fSDataOutputStream.close();
                            } catch (Throwable th) {
                                if (createWriter != null) {
                                    try {
                                        createWriter.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                }
                                throw th;
                            }
                        } finally {
                        }
                    } finally {
                        stopWatch.stop();
                    }
                }
            });
            logger.debug("Wrote Sequence File {} ({}).", new Object[]{str, stopWatch.calculateDataRate(flowFile.getSize())});
            return write;
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException("UTF-8 is not a supported Character Format");
        }
    }

    protected void processInputStream(InputStream inputStream, FlowFile flowFile, SequenceFile.Writer writer) throws IOException {
        writer.append(new Text(flowFile.getAttribute(CoreAttributes.FILENAME.key())), new InputStreamWritable(new BufferedInputStream(inputStream), (int) flowFile.getSize()));
    }
}
