package org.apache.nifi.processors.hadoop;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.AccessControlException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;

@CapabilityDescription("Retrieves a file from HDFS. The content of the incoming FlowFile is replaced by the content of the file in HDFS. The file in HDFS is left intact without any changes being made to it.")
@WritesAttribute(attribute = "hdfs.failure.reason", description = "When a FlowFile is routed to 'failure', this attribute is added indicating why the file could not be fetched from HDFS")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@SupportsBatching
@Tags({"hadoop", "hdfs", "get", "ingest", "fetch", "source"})
@SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class})
/* loaded from: input_file:org/apache/nifi/processors/hadoop/FetchHDFS.class */
public class FetchHDFS extends AbstractHadoopProcessor {
    static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder().name("HDFS Filename").description("The name of the HDFS file to retrieve").required(true).expressionLanguageSupported(true).defaultValue("${path}/${filename}").addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles will be routed to this relationship once they have been updated with the content of the HDFS file").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles will be routed to this relationship if the content of the HDFS file cannot be retrieved and trying again will likely not be helpful. This would occur, for instance, if the file is not found or if there is a permissions issue").build();
    static final Relationship REL_COMMS_FAILURE = new Relationship.Builder().name("comms.failure").description("FlowFiles will be routed to this relationship if the content of the HDFS file cannot be retrieve due to a communications failure. This generally indicates that the Fetch should be tried again.").build();

    @Override // org.apache.nifi.processors.hadoop.AbstractHadoopProcessor
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList(this.properties);
        arrayList.add(FILENAME);
        return arrayList;
    }

    public Set<Relationship> getRelationships() {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        hashSet.add(REL_COMMS_FAILURE);
        return hashSet;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        FileSystem fileSystem = getFileSystem();
        String value = processContext.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue();
        try {
            Path path = new Path(value);
            URI uri = path.toUri();
            StopWatch stopWatch = new StopWatch(true);
            try {
                FSDataInputStream open = fileSystem.open(path, 16384);
                Throwable th = null;
                try {
                    try {
                        FlowFile importFrom = processSession.importFrom(open, flowFile);
                        stopWatch.stop();
                        getLogger().info("Successfully received content from {} for {} in {}", new Object[]{uri, importFrom, stopWatch.getDuration()});
                        processSession.getProvenanceReporter().fetch(importFrom, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
                        processSession.transfer(importFrom, REL_SUCCESS);
                        if (open != null) {
                            if (0 != 0) {
                                try {
                                    open.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                open.close();
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (open != null) {
                        if (th != null) {
                            try {
                                open.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            open.close();
                        }
                    }
                    throw th4;
                }
            } catch (FileNotFoundException | AccessControlException e) {
                getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{uri, flowFile, e});
                processSession.transfer(processSession.penalize(processSession.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage())), REL_FAILURE);
            } catch (IOException e2) {
                getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[]{uri, flowFile, e2});
                processSession.transfer(processSession.penalize(flowFile), REL_COMMS_FAILURE);
            }
        } catch (IllegalArgumentException e3) {
            getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{value, flowFile, e3});
            processSession.transfer(processSession.penalize(processSession.putAttribute(flowFile, "hdfs.failure.reason", e3.getMessage())), REL_FAILURE);
        }
    }
}
