package org.apache.nifi.processors.hadoop;

import java.io.BufferedOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.StopWatch;

@DefaultSettings(yieldDuration = "100 ms")
@TriggerWhenEmpty
/* loaded from: input_file:org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.class */
public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
    public static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder().name("filename").displayName("Filename").description("The name of the file to retrieve").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("${path}/${filename}").addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("The service for writing records to the FlowFile content").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles will be routed to this relationship once they have been updated with the content of the file").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles will be routed to this relationship if the content of the file cannot be retrieved and trying again will likely not be helpful. This would occur, for instance, if the file is not found or if there is a permissions issue").build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("FlowFiles will be routed to this relationship if the content of the file cannot be retrieved, but might be able to be in the future if tried again. This generally indicates that the Fetch should be tried again.").build();
    public static final String FETCH_FAILURE_REASON_ATTR = "fetch.failure.reason";
    public static final String RECORD_COUNT_ATTR = "record.count";
    private volatile Set<Relationship> fetchHdfsRecordRelationships;
    private volatile List<PropertyDescriptor> fetchHdfsRecordProperties;

    protected final void init(ProcessorInitializationContext processorInitializationContext) {
        super.init(processorInitializationContext);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_RETRY);
        hashSet.add(REL_FAILURE);
        this.fetchHdfsRecordRelationships = Collections.unmodifiableSet(hashSet);
        ArrayList arrayList = new ArrayList(this.properties);
        arrayList.add(FILENAME);
        arrayList.add(RECORD_WRITER);
        arrayList.addAll(getAdditionalProperties());
        this.fetchHdfsRecordProperties = Collections.unmodifiableList(arrayList);
    }

    public List<PropertyDescriptor> getAdditionalProperties() {
        return Collections.emptyList();
    }

    public final Set<Relationship> getRelationships() {
        return this.fetchHdfsRecordRelationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.fetchHdfsRecordProperties;
    }

    public abstract HDFSRecordReader createHDFSRecordReader(ProcessContext processContext, FlowFile flowFile, Configuration configuration, Path path) throws IOException;

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FileSystem fileSystem = getFileSystem();
        Configuration configuration = getConfiguration();
        UserGroupInformation userGroupInformation = getUserGroupInformation();
        if (configuration == null || fileSystem == null || userGroupInformation == null) {
            getLogger().error("Processor not configured properly because Configuration, FileSystem, or UserGroupInformation was null");
            processContext.yield();
            return;
        }
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            processContext.yield();
        } else {
            userGroupInformation.doAs(() -> {
                String value = processContext.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue();
                try {
                    Path path = new Path(value);
                    AtomicReference atomicReference = new AtomicReference(null);
                    AtomicReference atomicReference2 = new AtomicReference();
                    RecordSetWriterFactory asControllerService = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
                    StopWatch stopWatch = new StopWatch(true);
                    FlowFile create = processSession.create(flowFile);
                    AtomicReference atomicReference3 = new AtomicReference();
                    FlowFile write = processSession.write(create, outputStream -> {
                        ?? r18;
                        ?? r19;
                        try {
                            try {
                                BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream);
                                Throwable th = null;
                                try {
                                    HDFSRecordReader createHDFSRecordReader = createHDFSRecordReader(processContext, flowFile, configuration, path);
                                    Throwable th2 = null;
                                    Record nextRecord = createHDFSRecordReader.nextRecord();
                                    RecordSetWriter createWriter = asControllerService.createWriter(getLogger(), asControllerService.getSchema(flowFile.getAttributes(), nextRecord == null ? null : nextRecord.getSchema()), bufferedOutputStream);
                                    Throwable th3 = null;
                                    try {
                                        try {
                                            createWriter.beginRecordSet();
                                            if (nextRecord != null) {
                                                createWriter.write(nextRecord);
                                            }
                                            while (true) {
                                                Record nextRecord2 = createHDFSRecordReader.nextRecord();
                                                if (nextRecord2 == null) {
                                                    break;
                                                } else {
                                                    createWriter.write(nextRecord2);
                                                }
                                            }
                                            atomicReference2.set(createWriter.finishRecordSet());
                                            atomicReference3.set(createWriter.getMimeType());
                                            if (createWriter != null) {
                                                if (0 != 0) {
                                                    try {
                                                        createWriter.close();
                                                    } catch (Throwable th4) {
                                                        th3.addSuppressed(th4);
                                                    }
                                                } else {
                                                    createWriter.close();
                                                }
                                            }
                                            if (createHDFSRecordReader != null) {
                                                if (0 != 0) {
                                                    try {
                                                        createHDFSRecordReader.close();
                                                    } catch (Throwable th5) {
                                                        th2.addSuppressed(th5);
                                                    }
                                                } else {
                                                    createHDFSRecordReader.close();
                                                }
                                            }
                                            if (bufferedOutputStream != null) {
                                                if (0 != 0) {
                                                    try {
                                                        bufferedOutputStream.close();
                                                    } catch (Throwable th6) {
                                                        th.addSuppressed(th6);
                                                    }
                                                } else {
                                                    bufferedOutputStream.close();
                                                }
                                            }
                                        } catch (Throwable th7) {
                                            th3 = th7;
                                            throw th7;
                                        }
                                    } catch (Throwable th8) {
                                        if (createWriter != null) {
                                            if (th3 != null) {
                                                try {
                                                    createWriter.close();
                                                } catch (Throwable th9) {
                                                    th3.addSuppressed(th9);
                                                }
                                            } else {
                                                createWriter.close();
                                            }
                                        }
                                        throw th8;
                                    }
                                } catch (Throwable th10) {
                                    if (r18 != 0) {
                                        if (r19 != 0) {
                                            try {
                                                r18.close();
                                            } catch (Throwable th11) {
                                                r19.addSuppressed(th11);
                                            }
                                        } else {
                                            r18.close();
                                        }
                                    }
                                    throw th10;
                                }
                            } finally {
                            }
                        } catch (Exception e) {
                            atomicReference.set(e);
                        }
                    });
                    stopWatch.stop();
                    if (atomicReference.get() != null) {
                        throw ((Throwable) atomicReference.get());
                    }
                    FlowFile postProcess = postProcess(processContext, processSession, write, path);
                    HashMap hashMap = new HashMap(((WriteResult) atomicReference2.get()).getAttributes());
                    hashMap.put("record.count", String.valueOf(((WriteResult) atomicReference2.get()).getRecordCount()));
                    hashMap.put(CoreAttributes.MIME_TYPE.key(), atomicReference3.get());
                    FlowFile putAllAttributes = processSession.putAllAttributes(postProcess, hashMap);
                    Path makeQualified = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
                    getLogger().info("Successfully received content from {} for {} in {} milliseconds", new Object[]{makeQualified, putAllAttributes, stopWatch.getDuration()});
                    processSession.getProvenanceReporter().fetch(putAllAttributes, makeQualified.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
                    processSession.transfer(putAllAttributes, REL_SUCCESS);
                    processSession.remove(flowFile);
                    return null;
                } catch (FileNotFoundException | AccessControlException e) {
                    getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{value, flowFile, e});
                    processSession.transfer(processSession.putAttribute(flowFile, FETCH_FAILURE_REASON_ATTR, e.getMessage() == null ? e.toString() : e.getMessage()), REL_FAILURE);
                    if (0 == 0) {
                        return null;
                    }
                    processSession.remove((FlowFile) null);
                    return null;
                } catch (IOException | FlowFileAccessException e2) {
                    getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to retry", new Object[]{value, flowFile, e2});
                    processSession.transfer(processSession.penalize(flowFile), REL_RETRY);
                    processContext.yield();
                    if (0 == 0) {
                    }
                } catch (Throwable th) {
                    getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{value, flowFile, th});
                    processSession.transfer(processSession.putAttribute(flowFile, FETCH_FAILURE_REASON_ATTR, th.getMessage() == null ? th.toString() : th.getMessage()), REL_FAILURE);
                    if (0 == 0) {
                    }
                }
            });
        }
    }

    protected FlowFile postProcess(ProcessContext processContext, ProcessSession processSession, FlowFile flowFile, Path path) {
        return flowFile;
    }
}
