package org.apache.nifi.flow.resource.hadoop;

import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.flow.resource.ExternalResourceDescriptor;
import org.apache.nifi.flow.resource.ExternalResourceProvider;
import org.apache.nifi.flow.resource.ExternalResourceProviderInitializationContext;
import org.apache.nifi.flow.resource.ImmutableExternalResourceDescriptor;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.processors.hadoop.ExtendedConfiguration;
import org.apache.nifi.processors.hadoop.HDFSResourceHelper;
import org.apache.nifi.processors.hadoop.HdfsResources;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RequiresInstanceClassLoading(cloneAncestorResources = true)
/* loaded from: input_file:org/apache/nifi/flow/resource/hadoop/HDFSExternalResourceProvider.class */
public class HDFSExternalResourceProvider implements ExternalResourceProvider {
    private static final String RESOURCES_PARAMETER = "resources";
    private static final String SOURCE_DIRECTORY_PARAMETER = "source.directory";
    private static final String STORAGE_LOCATION = "storage.location";
    private static final String KERBEROS_PRINCIPAL_PARAMETER = "kerberos.principal";
    private static final String KERBEROS_KEYTAB_PARAMETER = "kerberos.keytab";
    private static final String KERBEROS_PASSWORD_PARAMETER = "kerberos.password";
    private static final String STORAGE_LOCATION_PROPERTY = "fs.defaultFS";
    private static final int BUFFER_SIZE_DEFAULT = 4096;
    private static final String DELIMITER = "/";
    private volatile ExternalResourceProviderInitializationContext context;
    private static final Logger LOGGER = LoggerFactory.getLogger(HDFSExternalResourceProvider.class);
    private static final Object RESOURCES_LOCK = new Object();
    private volatile List<String> resources = null;
    private volatile Path sourceDirectory = null;
    private volatile String storageLocation = null;
    private volatile boolean initialized = false;

    public void initialize(ExternalResourceProviderInitializationContext externalResourceProviderInitializationContext) {
        this.resources = (List) Arrays.stream(((String) Objects.requireNonNull((String) externalResourceProviderInitializationContext.getProperties().get(RESOURCES_PARAMETER))).split(",")).map(str -> {
            return str.trim();
        }).filter(str2 -> {
            return !str2.isEmpty();
        }).collect(Collectors.toList());
        if (this.resources.isEmpty()) {
            throw new IllegalArgumentException("At least one HDFS configuration resource is necessary");
        }
        String str3 = (String) externalResourceProviderInitializationContext.getProperties().get(SOURCE_DIRECTORY_PARAMETER);
        if (str3 == null || str3.isEmpty()) {
            throw new IllegalArgumentException("Provider needs the source directory to be set");
        }
        this.sourceDirectory = new Path(str3);
        this.storageLocation = (String) externalResourceProviderInitializationContext.getProperties().get(STORAGE_LOCATION);
        this.context = externalResourceProviderInitializationContext;
        this.initialized = true;
    }

    public Collection<ExternalResourceDescriptor> listResources() throws IOException {
        if (!this.initialized) {
            throw new IllegalStateException("Provider is not initialized");
        }
        HdfsResources hdfsResources = getHdfsResources();
        try {
            try {
                List list = (List) Arrays.stream((FileStatus[]) hdfsResources.getUserGroupInformation().doAs(() -> {
                    return hdfsResources.getFileSystem().listStatus(this.sourceDirectory);
                })).filter(fileStatus -> {
                    return fileStatus.isFile();
                }).map(HDFSExternalResourceProvider::convertStatusToDescriptor).filter(this.context.getFilter()).collect(Collectors.toList());
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("The following NARs were found: {}", String.join(", ", (Iterable<? extends CharSequence>) list.stream().map(externalResourceDescriptor -> {
                        return externalResourceDescriptor.getLocation();
                    }).collect(Collectors.toList())));
                }
                return list;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("Provider cannot list resources", e);
            }
        } finally {
            HDFSResourceHelper.closeFileSystem(hdfsResources.getFileSystem());
        }
    }

    private static ExternalResourceDescriptor convertStatusToDescriptor(FileStatus fileStatus) {
        return new ImmutableExternalResourceDescriptor(fileStatus.getPath().getName(), fileStatus.getModificationTime());
    }

    public InputStream fetchExternalResource(ExternalResourceDescriptor externalResourceDescriptor) throws IOException {
        if (!this.initialized) {
            throw new IllegalStateException("Provider is not initialized");
        }
        String location = externalResourceDescriptor.getLocation();
        Path location2 = getLocation(location);
        HdfsResources hdfsResources = getHdfsResources();
        try {
            return new HDFSResourceInputStream(hdfsResources.getFileSystem(), (FSDataInputStream) hdfsResources.getUserGroupInformation().doAs(() -> {
                if (hdfsResources.getFileSystem().exists(location2)) {
                    return hdfsResources.getFileSystem().open(location2, 4096);
                }
                throw new IOException("Cannot find file in HDFS at location " + location);
            }));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException("Error during acquiring file", e);
        }
    }

    private Path getLocation(String str) {
        String path = this.sourceDirectory.toString();
        if (!path.endsWith(DELIMITER)) {
            path = path + "/";
        }
        return new Path(path + str);
    }

    private HdfsResources getHdfsResources() throws IOException {
        UserGroupInformation loginSimple;
        KerberosKeytabUser kerberosKeytabUser;
        FileSystem fileSystemAsUser;
        ExtendedConfiguration extendedConfiguration = new ExtendedConfiguration(LOGGER);
        extendedConfiguration.setClassLoader(Thread.currentThread().getContextClassLoader());
        Iterator<String> it = this.resources.iterator();
        while (it.hasNext()) {
            extendedConfiguration.addResource(new Path(it.next()));
        }
        if (this.storageLocation != null) {
            extendedConfiguration.set(STORAGE_LOCATION_PROPERTY, this.storageLocation);
        }
        checkHdfsUriForTimeout(extendedConfiguration);
        extendedConfiguration.set(String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(extendedConfiguration).getScheme()), "true");
        synchronized (RESOURCES_LOCK) {
            if (SecurityUtil.isSecurityEnabled(extendedConfiguration)) {
                String str = (String) this.context.getProperties().get(KERBEROS_PRINCIPAL_PARAMETER);
                String str2 = (String) this.context.getProperties().get(KERBEROS_KEYTAB_PARAMETER);
                String str3 = (String) this.context.getProperties().get(KERBEROS_PASSWORD_PARAMETER);
                if (str2 != null) {
                    kerberosKeytabUser = new KerberosKeytabUser(str, str2);
                } else {
                    if (str3 == null) {
                        throw new IOException("Unable to authenticate with Kerberos, no keytab or password was provided");
                    }
                    kerberosKeytabUser = new KerberosPasswordUser(str, str3);
                }
                loginSimple = SecurityUtil.getUgiForKerberosUser(extendedConfiguration, kerberosKeytabUser);
            } else {
                extendedConfiguration.set("ipc.client.fallback-to-simple-auth-allowed", "true");
                extendedConfiguration.set("hadoop.security.authentication", "simple");
                loginSimple = SecurityUtil.loginSimple(extendedConfiguration);
                kerberosKeytabUser = null;
            }
            fileSystemAsUser = getFileSystemAsUser(extendedConfiguration, loginSimple);
        }
        LOGGER.debug("resetHDFSResources UGI [{}], KerberosUser [{}]", loginSimple, kerberosKeytabUser);
        Path workingDirectory = fileSystemAsUser.getWorkingDirectory();
        LOGGER.debug("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", new Object[]{workingDirectory, Long.valueOf(fileSystemAsUser.getDefaultBlockSize(workingDirectory)), Short.valueOf(fileSystemAsUser.getDefaultReplication(workingDirectory)), extendedConfiguration});
        if (fileSystemAsUser.exists(this.sourceDirectory)) {
            return new HdfsResources(extendedConfiguration, fileSystemAsUser, loginSimple, kerberosKeytabUser);
        }
        throw new IllegalArgumentException("Source directory is not existing");
    }

    private void checkHdfsUriForTimeout(Configuration configuration) throws IOException {
        URI defaultUri = FileSystem.getDefaultUri(configuration);
        String authority = defaultUri.getAuthority();
        int port = defaultUri.getPort();
        if (authority == null || authority.isEmpty() || port < 0) {
            return;
        }
        InetSocketAddress createSocketAddr = NetUtils.createSocketAddr(authority, port);
        Socket socket = null;
        try {
            socket = NetUtils.getDefaultSocketFactory(configuration).createSocket();
            NetUtils.connect(socket, createSocketAddr, 1000);
            IOUtils.closeQuietly(socket);
        } catch (Throwable th) {
            IOUtils.closeQuietly(socket);
            throw th;
        }
    }

    private FileSystem getFileSystemAsUser(final Configuration configuration, UserGroupInformation userGroupInformation) throws IOException {
        try {
            return (FileSystem) userGroupInformation.doAs(new PrivilegedExceptionAction<FileSystem>(this) { // from class: org.apache.nifi.flow.resource.hadoop.HDFSExternalResourceProvider.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.security.PrivilegedExceptionAction
                public FileSystem run() throws Exception {
                    return FileSystem.get(configuration);
                }
            });
        } catch (InterruptedException e) {
            throw new IOException("Unable to create file system: " + e.getMessage(), e);
        }
    }
}
