package org.apache.nifi.processors.hadoop;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.util.HDFSListing;
import org.apache.nifi.processors.hadoop.util.StringSerDe;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;

@CapabilityDescription("Retrieves a listing of files from HDFS. For each file that is listed in HDFS, creates a FlowFile that represents the HDFS file so that it can be fetched in conjunction with ListHDFS. This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.")
@WritesAttributes({@WritesAttribute(attribute = "filename", description = "The name of the file that was read from HDFS."), @WritesAttribute(attribute = "path", description = "The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."), @WritesAttribute(attribute = "hdfs.owner", description = "The user that owns the file in HDFS"), @WritesAttribute(attribute = "hdfs.group", description = "The group that owns the file in HDFS"), @WritesAttribute(attribute = "hdfs.lastModified", description = "The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"), @WritesAttribute(attribute = "hdfs.length", description = "The number of bytes in the file in HDFS"), @WritesAttribute(attribute = "hdfs.replication", description = "The number of HDFS replicas for hte file"), @WritesAttribute(attribute = "hdfs.permissions", description = "The permissions for the file in HDFS. This is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example rw-rw-r--")})
@Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing of HDFS files, the timestamp of the newest file is stored, along with the filenames of all files that share that same timestamp. This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
@TriggerWhenEmpty
@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"})
@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
/* loaded from: input_file:org/apache/nifi/processors/hadoop/ListHDFS.class */
public class ListHDFS extends AbstractHadoopProcessor {
    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder().name("Distributed Cache Service").description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node begins pulling data, it won't duplicate all of the work that has been done.").required(false).identifiesControllerService(DistributedMapCacheClient.class).build();
    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder().name(AbstractHadoopProcessor.DIRECTORY_PROP_NAME).description("The HDFS directory from which files should be read").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder().name("Recurse Subdirectories").description("Indicates whether to list files from subdirectories of the HDFS directory").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles are transferred to this relationship").build();
    private volatile Long lastListingTime = null;
    private volatile Set<Path> latestPathsListed = new HashSet();
    private volatile boolean electedPrimaryNode = false;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.nifi.processors.hadoop.AbstractHadoopProcessor
    public void init(ProcessorInitializationContext processorInitializationContext) {
        super.init(processorInitializationContext);
    }

    protected File getPersistenceFile() {
        return new File("conf/state/" + getIdentifier());
    }

    @Override // org.apache.nifi.processors.hadoop.AbstractHadoopProcessor
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(HADOOP_CONFIGURATION_RESOURCES);
        arrayList.add(DISTRIBUTED_CACHE_SERVICE);
        arrayList.add(DIRECTORY);
        arrayList.add(RECURSE_SUBDIRS);
        arrayList.add(KERBEROS_PRINCIPAL);
        arrayList.add(KERBEROS_KEYTAB);
        arrayList.add(KERBEROS_RELOGIN_PERIOD);
        return arrayList;
    }

    public Set<Relationship> getRelationships() {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        return hashSet;
    }

    protected String getKey(String str) {
        return getIdentifier() + ".lastListingTime." + str;
    }

    @OnPrimaryNodeStateChange
    public void onPrimaryNodeChange(PrimaryNodeState primaryNodeState) {
        if (primaryNodeState == PrimaryNodeState.ELECTED_PRIMARY_NODE) {
            this.electedPrimaryNode = true;
        }
    }

    public void onPropertyModified(PropertyDescriptor propertyDescriptor, String str, String str2) {
        if (isConfigurationRestored() && propertyDescriptor.equals(DIRECTORY)) {
            this.lastListingTime = null;
            this.latestPathsListed = new HashSet();
        }
    }

    private HDFSListing deserialize(String str) throws JsonParseException, JsonMappingException, IOException {
        ObjectMapper objectMapper = new ObjectMapper();
        return (HDFSListing) objectMapper.readValue(objectMapper.readTree(str), HDFSListing.class);
    }

    @OnScheduled
    public void moveStateToStateManager(ProcessContext processContext) throws IOException {
        HDFSListing listingFromService;
        if (processContext.getStateManager().getState(Scope.CLUSTER).getVersion() != -1 || (listingFromService = getListingFromService(processContext)) == null) {
            return;
        }
        processContext.getStateManager().setState(listingFromService.toMap(), Scope.CLUSTER);
    }

    private HDFSListing getListingFromService(ProcessContext processContext) throws IOException {
        String str;
        DistributedMapCacheClient asControllerService = processContext.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
        if (asControllerService == null || (str = (String) asControllerService.get(getKey(processContext.getProperty(DIRECTORY).getValue()), new StringSerDe(), new StringSerDe())) == null) {
            return null;
        }
        try {
            return deserialize(str);
        } catch (Exception e) {
            getLogger().error("Failed to retrieve state from Distributed Map Cache because the content that was retrieved could not be understood", e);
            return null;
        }
    }

    private Long getMinTimestamp(String str, HDFSListing hDFSListing) throws IOException {
        if (hDFSListing == null) {
            return this.lastListingTime;
        }
        Long l = this.lastListingTime;
        if (l != null && l.longValue() > hDFSListing.getLatestTimestamp().getTime()) {
            return l;
        }
        if (l == null || this.electedPrimaryNode) {
            this.latestPathsListed = hDFSListing.toPaths();
            this.lastListingTime = Long.valueOf(hDFSListing.getLatestTimestamp().getTime());
        }
        return l;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        String value = processContext.getProperty(DIRECTORY).getValue();
        try {
            StateMap state = processContext.getStateManager().getState(Scope.CLUSTER);
            Long minTimestamp = getMinTimestamp(value, state.getVersion() == -1 ? null : HDFSListing.fromMap(state.toMap()));
            int i = 0;
            Long l = null;
            try {
                Set<FileStatus> statuses = getStatuses(new Path(value), processContext.getProperty(RECURSE_SUBDIRS).asBoolean().booleanValue(), getFileSystem());
                for (FileStatus fileStatus : statuses) {
                    long modificationTime = fileStatus.getModificationTime();
                    if (!fileStatus.getPath().getName().endsWith("_COPYING_") && (minTimestamp == null || modificationTime > minTimestamp.longValue() || (modificationTime == minTimestamp.longValue() && !this.latestPathsListed.contains(fileStatus.getPath())))) {
                        processSession.transfer(processSession.putAllAttributes(processSession.create(), createAttributes(fileStatus)), REL_SUCCESS);
                        i++;
                        if (l == null || modificationTime > l.longValue()) {
                            l = Long.valueOf(modificationTime);
                        }
                    }
                }
                if (i <= 0) {
                    getLogger().debug("There is no data to list. Yielding.");
                    processContext.yield();
                    if (this.lastListingTime == null) {
                        this.lastListingTime = 0L;
                        return;
                    }
                    return;
                }
                getLogger().info("Successfully created listing with {} new files from HDFS", new Object[]{Integer.valueOf(i)});
                processSession.commit();
                try {
                    processContext.getStateManager().setState(createListing(l.longValue(), statuses).toMap(), Scope.CLUSTER);
                } catch (IOException e) {
                    getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", e);
                }
                this.lastListingTime = l;
                this.latestPathsListed.clear();
                Iterator<FileStatus> it = statuses.iterator();
                while (it.hasNext()) {
                    this.latestPathsListed.add(it.next().getPath());
                }
            } catch (IOException e2) {
                getLogger().error("Failed to perform listing of HDFS due to {}", new Object[]{e2});
            }
        } catch (IOException e3) {
            getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
            processContext.yield();
        }
    }

    private Set<FileStatus> getStatuses(Path path, boolean z, FileSystem fileSystem) throws IOException {
        HashSet hashSet = new HashSet();
        for (FileStatus fileStatus : fileSystem.listStatus(path)) {
            if (!fileStatus.isDirectory()) {
                hashSet.add(fileStatus);
            } else if (z) {
                try {
                    hashSet.addAll(getStatuses(fileStatus.getPath(), z, fileSystem));
                } catch (IOException e) {
                    getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[]{fileStatus.getPath(), e});
                }
            }
        }
        return hashSet;
    }

    private HDFSListing createListing(long j, Set<FileStatus> set) {
        HashSet hashSet = new HashSet();
        Iterator<FileStatus> it = set.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getPath().toUri().toString());
        }
        HDFSListing hDFSListing = new HDFSListing();
        hDFSListing.setLatestTimestamp(new Date(j));
        hDFSListing.setMatchingPaths(hashSet);
        return hDFSListing;
    }

    private String getAbsolutePath(Path path) {
        Path parent = path.getParent();
        return ((parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent)) + "/" + path.getName();
    }

    private Map<String, String> createAttributes(FileStatus fileStatus) {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreAttributes.FILENAME.key(), fileStatus.getPath().getName());
        hashMap.put(CoreAttributes.PATH.key(), getAbsolutePath(fileStatus.getPath().getParent()));
        hashMap.put("hdfs.owner", fileStatus.getOwner());
        hashMap.put("hdfs.group", fileStatus.getGroup());
        hashMap.put("hdfs.lastModified", String.valueOf(fileStatus.getModificationTime()));
        hashMap.put("hdfs.length", String.valueOf(fileStatus.getLen()));
        hashMap.put("hdfs.replication", String.valueOf((int) fileStatus.getReplication()));
        FsPermission permission = fileStatus.getPermission();
        hashMap.put("hdfs.permissions", getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction()));
        return hashMap;
    }

    private String getPerms(FsAction fsAction) {
        StringBuilder sb = new StringBuilder();
        if (fsAction.implies(FsAction.READ)) {
            sb.append("r");
        } else {
            sb.append("-");
        }
        if (fsAction.implies(FsAction.WRITE)) {
            sb.append("w");
        } else {
            sb.append("-");
        }
        if (fsAction.implies(FsAction.EXECUTE)) {
            sb.append("x");
        } else {
            sb.append("-");
        }
        return sb.toString();
    }
}
