package org.apache.nifi.processors.hadoop;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.AbstractHadoopProcessor;
import org.apache.nifi.util.StopWatch;

@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles. This Processor will delete the file from HDFS after fetching it.")
@TriggerWhenEmpty
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"})
@WritesAttributes({@WritesAttribute(attribute = "filename", description = "The name of the file that was read from HDFS."), @WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".")})
@SeeAlso({PutHDFS.class, ListHDFS.class})
/* loaded from: input_file:org/apache/nifi/processors/hadoop/GetHDFS.class */
public class GetHDFS extends AbstractHadoopProcessor {
    public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
    public static final int BUFFER_SIZE_DEFAULT = 4096;
    public static final int MAX_WORKING_QUEUE_SIZE = 25000;
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files retrieved from HDFS are transferred to this relationship").build();
    public static final Relationship REL_PASSTHROUGH = new Relationship.Builder().name("passthrough").description("If this processor has an input queue for some reason, then FlowFiles arriving on that input are transferred to this relationship").build();
    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder().name(AbstractHadoopProcessor.DIRECTORY_PROP_NAME).description("The HDFS directory from which files should be read").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder().name("Recurse Subdirectories").description("Indicates whether to pull files from subdirectories of the HDFS directory").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    public static final PropertyDescriptor KEEP_SOURCE_FILE = new PropertyDescriptor.Builder().name("Keep Source File").description("Determines whether to delete the file from HDFS after it has been successfully transferred. If true, the file will be fetched repeatedly. This is intended for testing only.").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder().name("File Filter Regex").description("A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular Expression will be fetched, otherwise all files will be fetched").required(false).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
    public static final PropertyDescriptor FILTER_MATCH_NAME_ONLY = new PropertyDescriptor.Builder().name("Filter Match Name Only").description("If true then File Filter Regex will match on just the filename, otherwise subdirectory names will be included with filename in the regex comparison").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder().name("Ignore Dotted Files").description("If true, files whose names begin with a dot (\".\") will be ignored").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder().name("Minimum File Age").description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (based on last modification date) will be ignored").required(true).addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)).defaultValue("0 sec").build();
    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder().name("Maximum File Age").description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (based on last modification date) will be ignored").required(false).addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)).build();
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").description("The maximum number of files to pull in each iteration, based on run schedule.").required(true).defaultValue("100").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder().name("Polling Interval").description("Indicates how long to wait between performing directory listings").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("0 sec").build();
    public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder().name("IO Buffer Size").description("Amount of memory to use to buffer file contents during IO. This overrides the Hadoop Configuration").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    private static final Set<Relationship> relationships;
    protected static final List<PropertyDescriptor> localProperties;
    protected ProcessorConfiguration processorConfig;
    private final AtomicLong logEmptyListing = new AtomicLong(2);
    private final AtomicLong lastPollTime = new AtomicLong(0);
    private final Lock listingLock = new ReentrantLock();
    private final Lock queueLock = new ReentrantLock();
    private final BlockingQueue<Path> filePathQueue = new LinkedBlockingQueue(MAX_WORKING_QUEUE_SIZE);
    private final BlockingQueue<Path> processing = new LinkedBlockingQueue();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/nifi/processors/hadoop/GetHDFS$ProcessorConfiguration.class */
    public static class ProcessorConfiguration {
        private final Path configuredRootDirPath;
        private final Pattern fileFilterPattern;
        private final boolean ignoreDottedFiles;
        private final boolean filterMatchBasenameOnly;
        private final long minimumAge;
        private final long maximumAge;
        private final boolean recurseSubdirs;
        private final PathFilter pathFilter;

        ProcessorConfiguration(ProcessContext processContext) {
            this.configuredRootDirPath = new Path(processContext.getProperty(GetHDFS.DIRECTORY).getValue());
            this.ignoreDottedFiles = processContext.getProperty(GetHDFS.IGNORE_DOTTED_FILES).asBoolean().booleanValue();
            String value = processContext.getProperty(GetHDFS.FILE_FILTER_REGEX).getValue();
            this.fileFilterPattern = value == null ? null : Pattern.compile(value);
            this.filterMatchBasenameOnly = processContext.getProperty(GetHDFS.FILTER_MATCH_NAME_ONLY).asBoolean().booleanValue();
            Long asTimePeriod = processContext.getProperty(GetHDFS.MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
            this.minimumAge = asTimePeriod == null ? 0L : asTimePeriod.longValue();
            Long asTimePeriod2 = processContext.getProperty(GetHDFS.MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
            this.maximumAge = asTimePeriod2 == null ? Long.MAX_VALUE : asTimePeriod2.longValue();
            this.recurseSubdirs = processContext.getProperty(GetHDFS.RECURSE_SUBDIRS).asBoolean().booleanValue();
            this.pathFilter = new PathFilter() { // from class: org.apache.nifi.processors.hadoop.GetHDFS.ProcessorConfiguration.1
                public boolean accept(Path path) {
                    String name;
                    if (ProcessorConfiguration.this.ignoreDottedFiles && path.getName().startsWith(".")) {
                        return false;
                    }
                    if (ProcessorConfiguration.this.filterMatchBasenameOnly) {
                        name = path.getName();
                    } else {
                        String pathDifference = AbstractHadoopProcessor.getPathDifference(ProcessorConfiguration.this.configuredRootDirPath, path);
                        name = pathDifference.length() == 0 ? path.getName() : pathDifference + "/" + path.getName();
                    }
                    return ProcessorConfiguration.this.fileFilterPattern == null || ProcessorConfiguration.this.fileFilterPattern.matcher(name).matches();
                }
            };
        }

        public Path getConfiguredRootDirPath() {
            return this.configuredRootDirPath;
        }

        protected long getMinimumAge() {
            return this.minimumAge;
        }

        protected long getMaximumAge() {
            return this.maximumAge;
        }

        public boolean getRecurseSubdirs() {
            return this.recurseSubdirs;
        }

        protected PathFilter getPathFilter() {
            return this.pathFilter;
        }
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @Override // org.apache.nifi.processors.hadoop.AbstractHadoopProcessor
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return localProperties;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList(super.customValidate(validationContext));
        Long asTimePeriod = validationContext.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
        Long asTimePeriod2 = validationContext.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
        if ((asTimePeriod == null ? 0L : asTimePeriod.longValue()) > (asTimePeriod2 == null ? Long.MAX_VALUE : asTimePeriod2.longValue())) {
            arrayList.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration").explanation(MIN_AGE.getName() + " cannot be greater than " + MAX_AGE.getName()).build());
        }
        return arrayList;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) throws IOException {
        abstractOnScheduled(processContext);
        this.processorConfig = new ProcessorConfiguration(processContext);
        FileSystem fileSystem = getFileSystem();
        Path path = new Path(processContext.getProperty(DIRECTORY).getValue());
        if (!fileSystem.exists(path)) {
            throw new IOException("PropertyDescriptor " + DIRECTORY + " has invalid value " + path + ". The directory does not exist.");
        }
        this.queueLock.lock();
        try {
            this.filePathQueue.clear();
            this.processing.clear();
            this.queueLock.unlock();
        } catch (Throwable th) {
            this.queueLock.unlock();
            throw th;
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        int intValue = processContext.getProperty(BATCH_SIZE).asInteger().intValue();
        ArrayList arrayList = new ArrayList(intValue);
        Iterator it = processSession.get(10).iterator();
        while (it.hasNext()) {
            processSession.transfer((FlowFile) it.next(), REL_PASSTHROUGH);
        }
        if (this.filePathQueue.size() < 12500) {
            try {
                StopWatch stopWatch = new StopWatch(true);
                Set<Path> performListing = performListing(processContext);
                stopWatch.stop();
                long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                if (performListing != null) {
                    int i = 0;
                    this.queueLock.lock();
                    try {
                        try {
                            for (Path path : performListing) {
                                if (!this.filePathQueue.contains(path) && !this.processing.contains(path)) {
                                    if (!this.filePathQueue.offer(path)) {
                                        break;
                                    } else {
                                        i++;
                                    }
                                }
                            }
                            this.queueLock.unlock();
                        } finally {
                        }
                    } catch (Exception e) {
                        getLogger().warn("Could not add to processing queue due to {}", new Object[]{e});
                        this.queueLock.unlock();
                    }
                    if (performListing.size() > 0) {
                        this.logEmptyListing.set(3L);
                    }
                    if (this.logEmptyListing.getAndDecrement() > 0) {
                        getLogger().info("Obtained file listing in {} milliseconds; listing had {} items, {} of which were new", new Object[]{Long.valueOf(duration), Integer.valueOf(performListing.size()), Integer.valueOf(i)});
                    }
                }
            } catch (IOException e2) {
                processContext.yield();
                getLogger().warn("Error while retrieving list of files due to {}", new Object[]{e2});
                return;
            }
        }
        this.queueLock.lock();
        try {
            this.filePathQueue.drainTo(arrayList, intValue);
            if (arrayList.isEmpty()) {
                processContext.yield();
                return;
            }
            this.processing.addAll(arrayList);
            this.queueLock.unlock();
            processBatchOfFiles(arrayList, processContext, processSession);
            this.queueLock.lock();
            try {
                this.processing.removeAll(arrayList);
                this.queueLock.unlock();
            } finally {
                this.queueLock.unlock();
            }
        } finally {
            this.queueLock.unlock();
        }
    }

    protected void processBatchOfFiles(List<Path> list, ProcessContext processContext, ProcessSession processSession) {
        String str;
        InputStream inputStream = null;
        Configuration configuration = getConfiguration();
        FileSystem fileSystem = getFileSystem();
        boolean booleanValue = processContext.getProperty(KEEP_SOURCE_FILE).asBoolean().booleanValue();
        Double asDataSize = processContext.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
        int intValue = asDataSize != null ? asDataSize.intValue() : configuration.getInt("io.file.buffer.size", 4096);
        Path path = new Path(processContext.getProperty(DIRECTORY).getValue());
        AbstractHadoopProcessor.CompressionType valueOf = AbstractHadoopProcessor.CompressionType.valueOf(processContext.getProperty(COMPRESSION_CODEC).toString());
        boolean z = valueOf == AbstractHadoopProcessor.CompressionType.AUTOMATIC;
        CompressionCodec compressionCodec = (z || valueOf != AbstractHadoopProcessor.CompressionType.NONE) ? getCompressionCodec(processContext, getConfiguration()) : null;
        CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(configuration);
        for (Path path2 : list) {
            try {
                try {
                    if (fileSystem.exists(path2)) {
                        String name = path2.getName();
                        String pathDifference = getPathDifference(path, path2);
                        InputStream open = fileSystem.open(path2, intValue);
                        if (z) {
                            compressionCodec = compressionCodecFactory.getCodec(path2);
                        }
                        if (compressionCodec != null) {
                            open = compressionCodec.createInputStream(open);
                            str = StringUtils.removeEnd(name, compressionCodec.getDefaultExtension());
                        } else {
                            str = name;
                        }
                        FlowFile create = processSession.create();
                        StopWatch stopWatch = new StopWatch(true);
                        FlowFile importFrom = processSession.importFrom(open, create);
                        stopWatch.stop();
                        String calculateDataRate = stopWatch.calculateDataRate(importFrom.getSize());
                        long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                        FlowFile putAttribute = processSession.putAttribute(processSession.putAttribute(importFrom, CoreAttributes.PATH.key(), pathDifference), CoreAttributes.FILENAME.key(), str);
                        if (booleanValue || fileSystem.delete(path2, false)) {
                            processSession.getProvenanceReporter().receive(putAttribute, name.startsWith("/") ? "hdfs:/" + name : "hdfs://" + name);
                            processSession.transfer(putAttribute, REL_SUCCESS);
                            getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}", new Object[]{putAttribute, path2, Long.valueOf(duration), calculateDataRate});
                            processSession.commit();
                            IOUtils.closeQuietly(open);
                            inputStream = null;
                        } else {
                            getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...", new Object[]{path2});
                            processSession.remove(putAttribute);
                            IOUtils.closeQuietly(open);
                            inputStream = null;
                        }
                    } else {
                        IOUtils.closeQuietly(inputStream);
                        inputStream = null;
                    }
                } catch (Throwable th) {
                    getLogger().error("Error retrieving file {} from HDFS due to {}", new Object[]{path2, th});
                    processSession.rollback();
                    processContext.yield();
                    IOUtils.closeQuietly(inputStream);
                    inputStream = null;
                }
            } catch (Throwable th2) {
                IOUtils.closeQuietly(inputStream);
                throw th2;
            }
        }
    }

    protected Set<Path> performListing(ProcessContext processContext) throws IOException {
        Set<Path> set = null;
        if (System.currentTimeMillis() >= this.lastPollTime.get() + processContext.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS).longValue() && this.listingLock.tryLock()) {
            try {
                set = selectFiles(getFileSystem(), this.processorConfig.getConfiguredRootDirPath(), null);
                this.lastPollTime.set(System.currentTimeMillis());
                this.listingLock.unlock();
            } catch (Throwable th) {
                this.listingLock.unlock();
                throw th;
            }
        }
        return set;
    }

    protected Set<Path> selectFiles(FileSystem fileSystem, Path path, Set<Path> set) throws IOException {
        if (null == set) {
            set = new HashSet();
        }
        if (!fileSystem.exists(path)) {
            throw new IOException("Selection directory " + path.toString() + " doesn't appear to exist!");
        }
        HashSet hashSet = new HashSet();
        for (FileStatus fileStatus : fileSystem.listStatus(path)) {
            if (hashSet.size() >= 25000) {
                break;
            }
            Path path2 = fileStatus.getPath();
            if (set.add(path2)) {
                if (fileStatus.isDirectory() && this.processorConfig.getRecurseSubdirs()) {
                    hashSet.addAll(selectFiles(fileSystem, path2, set));
                } else if (!fileStatus.isDirectory() && this.processorConfig.getPathFilter().accept(path2)) {
                    long currentTimeMillis = System.currentTimeMillis() - fileStatus.getModificationTime();
                    if (this.processorConfig.getMinimumAge() < currentTimeMillis && currentTimeMillis < this.processorConfig.getMaximumAge()) {
                        hashSet.add(path2);
                        if (getLogger().isDebugEnabled()) {
                            getLogger().debug(this + " selected file at path: " + path2.toString());
                        }
                    }
                }
            }
        }
        return hashSet;
    }

    static {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_PASSTHROUGH);
        relationships = Collections.unmodifiableSet(hashSet);
        ArrayList arrayList = new ArrayList(properties);
        arrayList.add(DIRECTORY);
        arrayList.add(RECURSE_SUBDIRS);
        arrayList.add(KEEP_SOURCE_FILE);
        arrayList.add(FILE_FILTER_REGEX);
        arrayList.add(FILTER_MATCH_NAME_ONLY);
        arrayList.add(IGNORE_DOTTED_FILES);
        arrayList.add(MIN_AGE);
        arrayList.add(MAX_AGE);
        arrayList.add(POLLING_INTERVAL);
        arrayList.add(BATCH_SIZE);
        arrayList.add(BUFFER_SIZE);
        arrayList.add(COMPRESSION_CODEC);
        localProperties = Collections.unmodifiableList(arrayList);
    }
}
