package org.apache.nifi.processors.hadoop;

import com.google.common.base.Preconditions;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;

@CapabilityDescription("Rename existing files or a directory of files (non-recursive) on Hadoop Distributed File System (HDFS).")
@Restricted(restrictions = {@Restriction(requiredPermission = RequiredPermission.READ_DISTRIBUTED_FILESYSTEM, explanation = "Provides operator the ability to retrieve any file that NiFi has access to in HDFS or the local filesystem."), @Restriction(requiredPermission = RequiredPermission.WRITE_DISTRIBUTED_FILESYSTEM, explanation = "Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.")})
@WritesAttributes({@WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS is stored in this attribute."), @WritesAttribute(attribute = MoveHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE, description = "The absolute path to the file on HDFS is stored in this attribute.")})
@ReadsAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.")
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@Tags({"hadoop", "HCFS", "HDFS", "put", "move", "filesystem", "moveHDFS"})
@SeeAlso({PutHDFS.class, GetHDFS.class})
/* loaded from: input_file:org/apache/nifi/processors/hadoop/MoveHDFS.class */
public class MoveHDFS extends AbstractHadoopProcessor {
    public static final String REPLACE_RESOLUTION = "replace";
    public static final String IGNORE_RESOLUTION = "ignore";
    public static final String FAIL_RESOLUTION = "fail";
    private static final Set<Relationship> relationships;
    public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
    protected ProcessorConfiguration processorConfig;
    private final AtomicLong logEmptyListing = new AtomicLong(2);
    private final Lock listingLock = new ReentrantLock();
    private final Lock queueLock = new ReentrantLock();
    private final BlockingQueue<Path> filePathQueue = new LinkedBlockingQueue();
    private final BlockingQueue<Path> processing = new LinkedBlockingQueue();
    public static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue("replace", "replace", "Replaces the existing file if any.");
    public static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue("ignore", "ignore", "Failed rename operation stops processing and routes to success.");
    public static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue("fail", "fail", "Failing to rename a file routes to failure.");
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Files that have been successfully renamed on HDFS are transferred to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Files that could not be renamed on HDFS are transferred to this relationship").build();
    public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder().name("Conflict Resolution Strategy").description("Indicates what should happen when a file with the same name already exists in the output directory").required(true).defaultValue(FAIL_RESOLUTION_AV.getValue()).allowableValues(new AllowableValue[]{REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV}).build();
    public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder().name("File Filter Regex").description("A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular Expression will be fetched, otherwise all files will be fetched").required(false).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
    public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder().name("Ignore Dotted Files").description("If true, files whose names begin with a dot (\".\") will be ignored").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    public static final PropertyDescriptor INPUT_DIRECTORY_OR_FILE = new PropertyDescriptor.Builder().name("Input Directory or File").description("The HDFS directory from which files should be read, or a single file to read.").defaultValue("${path}").required(true).addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor OUTPUT_DIRECTORY = new PropertyDescriptor.Builder().name("Output Directory").description("The HDFS directory where the files will be moved to").required(true).addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor OPERATION = new PropertyDescriptor.Builder().name("HDFS Operation").description("The operation that will be performed on the source file").required(true).allowableValues(new String[]{"move", "copy"}).defaultValue("move").build();
    public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder().name("Remote Owner").description("Changes the owner of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change owner").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder().name("Remote Group").description("Changes the group of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change group").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/nifi/processors/hadoop/MoveHDFS$ProcessorConfiguration.class */
    public static class ProcessorConfiguration {
        private final String conflictResolution;
        private final String operation;
        private final Pattern fileFilterPattern;
        private final boolean ignoreDottedFiles;

        ProcessorConfiguration(ProcessContext processContext) {
            this.conflictResolution = processContext.getProperty(MoveHDFS.CONFLICT_RESOLUTION).getValue();
            this.operation = processContext.getProperty(MoveHDFS.OPERATION).getValue();
            String value = processContext.getProperty(MoveHDFS.FILE_FILTER_REGEX).getValue();
            this.fileFilterPattern = value == null ? null : Pattern.compile(value);
            this.ignoreDottedFiles = processContext.getProperty(MoveHDFS.IGNORE_DOTTED_FILES).asBoolean().booleanValue();
        }

        public String getOperation() {
            return this.operation;
        }

        public String getConflictResolution() {
            return this.conflictResolution;
        }

        protected PathFilter getPathFilter(final Path path) {
            return new PathFilter() { // from class: org.apache.nifi.processors.hadoop.MoveHDFS.ProcessorConfiguration.1
                public boolean accept(Path path2) {
                    if (ProcessorConfiguration.this.ignoreDottedFiles && path2.getName().startsWith(".")) {
                        return false;
                    }
                    String pathDifference = AbstractHadoopProcessor.getPathDifference(path, path2);
                    return ProcessorConfiguration.this.fileFilterPattern == null || ProcessorConfiguration.this.fileFilterPattern.matcher(pathDifference.length() == 0 ? path2.getName() : new StringBuilder().append(pathDifference).append("/").append(path2.getName()).toString()).matches();
                }
            };
        }
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList(this.properties);
        arrayList.add(CONFLICT_RESOLUTION);
        arrayList.add(INPUT_DIRECTORY_OR_FILE);
        arrayList.add(OUTPUT_DIRECTORY);
        arrayList.add(OPERATION);
        arrayList.add(FILE_FILTER_REGEX);
        arrayList.add(IGNORE_DOTTED_FILES);
        arrayList.add(REMOTE_OWNER);
        arrayList.add(REMOTE_GROUP);
        return arrayList;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) throws Exception {
        super.abstractOnScheduled(processContext);
        this.processorConfig = new ProcessorConfiguration(processContext);
        this.queueLock.lock();
        try {
            this.filePathQueue.clear();
            this.processing.clear();
        } finally {
            this.queueLock.unlock();
        }
    }

    /* JADX WARN: Finally extract failed */
    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null && processContext.hasIncomingConnection()) {
            return;
        }
        FlowFile create = flowFile != null ? flowFile : processSession.create();
        FileSystem fileSystem = getFileSystem();
        String value = processContext.getProperty(INPUT_DIRECTORY_OR_FILE).evaluateAttributeExpressions(create).getValue();
        try {
            Path path = new Path(value);
            if (!fileSystem.exists(path)) {
                throw new IOException("Input Directory or File does not exist in HDFS");
            }
            ArrayList arrayList = new ArrayList();
            try {
                StopWatch stopWatch = new StopWatch(true);
                Set<Path> performListing = performListing(processContext, path);
                stopWatch.stop();
                long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                if (performListing != null) {
                    int i = 0;
                    this.queueLock.lock();
                    try {
                        try {
                            for (Path path2 : performListing) {
                                if (!this.filePathQueue.contains(path2) && !this.processing.contains(path2)) {
                                    if (!this.filePathQueue.offer(path2)) {
                                        break;
                                    } else {
                                        i++;
                                    }
                                }
                            }
                            this.queueLock.unlock();
                        } catch (Exception e) {
                            getLogger().warn("Could not add to processing queue due to {}", new Object[]{e.getMessage()}, e);
                            this.queueLock.unlock();
                        }
                        if (performListing.size() > 0) {
                            this.logEmptyListing.set(3L);
                        }
                        if (this.logEmptyListing.getAndDecrement() > 0) {
                            getLogger().info("Obtained file listing in {} milliseconds; listing had {} items, {} of which were new", new Object[]{Long.valueOf(duration), Integer.valueOf(performListing.size()), Integer.valueOf(i)});
                        }
                    } catch (Throwable th) {
                        throw th;
                    }
                }
                this.queueLock.lock();
                try {
                    this.filePathQueue.drainTo(arrayList);
                    if (arrayList.isEmpty()) {
                        processSession.remove(create);
                        processContext.yield();
                        this.queueLock.unlock();
                        return;
                    }
                    this.queueLock.unlock();
                    processBatchOfFiles(arrayList, processContext, processSession, create);
                    this.queueLock.lock();
                    try {
                        this.processing.removeAll(arrayList);
                        this.queueLock.unlock();
                        processSession.remove(create);
                    } finally {
                        this.queueLock.unlock();
                    }
                } finally {
                    this.queueLock.unlock();
                }
            } catch (IOException e2) {
                processContext.yield();
                getLogger().warn("Error while retrieving list of files due to {}", new Object[]{e2});
            }
        } catch (Exception e3) {
            getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{value, create, e3});
            processSession.transfer(processSession.penalize(processSession.putAttribute(create, "hdfs.failure.reason", e3.getMessage())), REL_FAILURE);
        }
    }

    protected void processBatchOfFiles(List<Path> list, final ProcessContext processContext, final ProcessSession processSession, final FlowFile flowFile) {
        Preconditions.checkState(flowFile != null, "No parent flowfile for this batch was provided");
        final Configuration configuration = getConfiguration();
        final FileSystem fileSystem = getFileSystem();
        UserGroupInformation userGroupInformation = getUserGroupInformation();
        if (configuration == null || userGroupInformation == null) {
            getLogger().error("Configuration or UserGroupInformation not configured properly");
            processSession.transfer(flowFile, REL_FAILURE);
            processContext.yield();
        } else {
            for (final Path path : list) {
                userGroupInformation.doAs(new PrivilegedAction<Object>() { // from class: org.apache.nifi.processors.hadoop.MoveHDFS.1
                    /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
                    /* JADX WARN: Failed to find 'out' block for switch in B:17:0x00bb. Please report as an issue. */
                    @Override // java.security.PrivilegedAction
                    public Object run() {
                        FlowFile create = processSession.create(flowFile);
                        try {
                            String name = path.getName();
                            Path path2 = new Path(processContext.getProperty(MoveHDFS.OUTPUT_DIRECTORY).evaluateAttributeExpressions(flowFile).getValue());
                            Path path3 = new Path(path2, name);
                            if (fileSystem.exists(path3)) {
                                String conflictResolution = MoveHDFS.this.processorConfig.getConflictResolution();
                                boolean z = -1;
                                switch (conflictResolution.hashCode()) {
                                    case -1190396462:
                                        if (conflictResolution.equals("ignore")) {
                                            z = true;
                                            break;
                                        }
                                        break;
                                    case 3135262:
                                        if (conflictResolution.equals("fail")) {
                                            z = 2;
                                            break;
                                        }
                                        break;
                                    case 1094496948:
                                        if (conflictResolution.equals("replace")) {
                                            z = false;
                                            break;
                                        }
                                        break;
                                }
                                switch (z) {
                                    case false:
                                        if (fileSystem.delete(path3, false)) {
                                            MoveHDFS.this.getLogger().info("deleted {} in order to replace with the contents of {}", new Object[]{path3, create});
                                            break;
                                        }
                                        break;
                                    case true:
                                        processSession.transfer(create, MoveHDFS.REL_SUCCESS);
                                        MoveHDFS.this.getLogger().info("transferring {} to success because file with same name already exists", new Object[]{create});
                                        return null;
                                    case true:
                                        processSession.transfer(processSession.penalize(create), MoveHDFS.REL_FAILURE);
                                        MoveHDFS.this.getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{create});
                                        return null;
                                }
                            }
                            try {
                            } catch (FileNotFoundException e) {
                                if (!fileSystem.mkdirs(path2)) {
                                    throw new IOException(path2.toString() + " could not be created");
                                }
                                MoveHDFS.this.changeOwner(processContext, fileSystem, path2);
                            }
                            if (!fileSystem.getFileStatus(path2).isDirectory()) {
                                throw new IOException(path2.toString() + " already exists and is not a directory");
                            }
                            boolean z2 = false;
                            int i = 0;
                            while (true) {
                                if (i < 10) {
                                    if (MoveHDFS.this.processorConfig.getOperation().equals("move")) {
                                        if (fileSystem.rename(path, path3)) {
                                            z2 = true;
                                        } else {
                                            Thread.sleep(200L);
                                            i++;
                                        }
                                    } else if (FileUtil.copy(fileSystem, path, fileSystem, path3, false, configuration)) {
                                        z2 = true;
                                    } else {
                                        Thread.sleep(200L);
                                        i++;
                                    }
                                }
                            }
                            if (!z2) {
                                throw new ProcessException("Could not move file " + path + " to its final filename");
                            }
                            MoveHDFS.this.changeOwner(processContext, fileSystem, path3);
                            String path4 = path3.toString();
                            FlowFile putAttribute = processSession.putAttribute(processSession.putAttribute(create, CoreAttributes.FILENAME.key(), path3.getName()), MoveHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE, path3.getParent().toString());
                            processSession.getProvenanceReporter().send(putAttribute, path4.startsWith("/") ? "hdfs:/" + path4 : "hdfs://" + path4);
                            processSession.transfer(putAttribute, MoveHDFS.REL_SUCCESS);
                            return null;
                        } catch (Throwable th) {
                            MoveHDFS.this.getLogger().error("Failed to rename on HDFS due to {}", new Object[]{th});
                            processSession.transfer(processSession.penalize(create), MoveHDFS.REL_FAILURE);
                            processContext.yield();
                            return null;
                        }
                    }
                });
            }
        }
    }

    protected Set<Path> performListing(ProcessContext processContext, Path path) throws IOException {
        Set<Path> set = null;
        if (this.listingLock.tryLock()) {
            try {
                set = selectFiles(getFileSystem(), path, null);
                this.listingLock.unlock();
            } catch (Throwable th) {
                this.listingLock.unlock();
                throw th;
            }
        }
        return set;
    }

    protected void changeOwner(ProcessContext processContext, FileSystem fileSystem, Path path) {
        try {
            String value = processContext.getProperty(REMOTE_OWNER).getValue();
            String value2 = processContext.getProperty(REMOTE_GROUP).getValue();
            if (value != null || value2 != null) {
                fileSystem.setOwner(path, value, value2);
            }
        } catch (Exception e) {
            getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{path, e.getMessage()}, e);
        }
    }

    protected Set<Path> selectFiles(FileSystem fileSystem, Path path, Set<Path> set) throws IOException {
        if (null == set) {
            set = new HashSet();
        }
        if (!fileSystem.exists(path)) {
            throw new IOException("Selection directory " + path.toString() + " doesn't appear to exist!");
        }
        HashSet hashSet = new HashSet();
        FileStatus fileStatus = fileSystem.getFileStatus(path);
        if (fileStatus.isDirectory()) {
            for (FileStatus fileStatus2 : fileSystem.listStatus(path)) {
                Path path2 = fileStatus2.getPath();
                if (set.add(path2) && !fileStatus2.isDirectory() && this.processorConfig.getPathFilter(path).accept(path2)) {
                    hashSet.add(path2);
                    if (getLogger().isDebugEnabled()) {
                        getLogger().debug(this + " selected file at path: " + path2.toString());
                    }
                }
            }
        } else if (fileStatus.isFile()) {
            hashSet.add(path);
        }
        return hashSet;
    }

    static {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(hashSet);
    }
}
