package org.apache.nifi.processors.hadoop;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.base.Throwables;
import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
import org.ietf.jgss.GSSException;

@CapabilityDescription("Write FlowFile data to Hadoop Distributed File System (HDFS)")
@Restricted(restrictions = {@Restriction(requiredPermission = RequiredPermission.WRITE_DISTRIBUTED_FILESYSTEM, explanation = "Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.")})
@WritesAttributes({@WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS is stored in this attribute."), @WritesAttribute(attribute = MoveHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE, description = "The absolute path to the file on HDFS is stored in this attribute."), @WritesAttribute(attribute = "target.dir.created", description = "The result(true/false) indicates if the folder is created by the processor.")})
@ReadsAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "HCFS", "HDFS", "put", "copy", "filesystem"})
@SeeAlso({GetHDFS.class})
/* loaded from: input_file:org/apache/nifi/processors/hadoop/PutHDFS.class */
public class PutHDFS extends AbstractHadoopProcessor {
    protected static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
    protected static final int BUFFER_SIZE_DEFAULT = 4096;
    private Cache<Path, AclStatus> aclCache;
    protected static final String REPLACE_RESOLUTION = "replace";
    protected static final String IGNORE_RESOLUTION = "ignore";
    protected static final String FAIL_RESOLUTION = "fail";
    private static final Set<Relationship> relationships;
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Files that have been successfully written to HDFS are transferred to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Files that could not be written to HDFS for some reason are transferred to this relationship").build();
    protected static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue("replace", "replace", "Replaces the existing file if any.");
    protected static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue("ignore", "ignore", "Ignores the flow file and routes it to success.");
    protected static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue("fail", "fail", "Penalizes the flow file and routes it to failure.");
    protected static final String APPEND_RESOLUTION = "append";
    protected static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION, "Appends to the existing file if any, creates a new file otherwise.");
    protected static final String WRITE_AND_RENAME = "writeAndRename";
    protected static final AllowableValue WRITE_AND_RENAME_AV = new AllowableValue(WRITE_AND_RENAME, "Write and rename", "The processor writes FlowFile data into a temporary file and renames it after completion. This prevents other processes from reading partially written files.");
    protected static final String SIMPLE_WRITE = "simpleWrite";
    protected static final AllowableValue SIMPLE_WRITE_AV = new AllowableValue(SIMPLE_WRITE, "Simple write", "The processor writes FlowFile data directly to the destination file. In some cases this might cause reading partially written files.");
    protected static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder().name("Conflict Resolution Strategy").description("Indicates what should happen when a file with the same name already exists in the output directory").required(true).defaultValue(FAIL_RESOLUTION_AV.getValue()).allowableValues(new AllowableValue[]{REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV}).build();
    protected static final PropertyDescriptor WRITING_STRATEGY = new PropertyDescriptor.Builder().name("writing-strategy").displayName("Writing Strategy").description("Defines the approach for writing the FlowFile data.").required(true).defaultValue(WRITE_AND_RENAME_AV.getValue()).allowableValues(new AllowableValue[]{WRITE_AND_RENAME_AV, SIMPLE_WRITE_AV}).build();
    public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder().name("Block Size").description("Size of each block as written to HDFS. This overrides the Hadoop Configuration").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder().name("IO Buffer Size").description("Amount of memory to use to buffer file contents during IO. This overrides the Hadoop Configuration").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    public static final PropertyDescriptor REPLICATION_FACTOR = new PropertyDescriptor.Builder().name("Replication").description("Number of times that HDFS will replicate each file. This overrides the Hadoop Configuration").addValidator(HadoopValidators.POSITIVE_SHORT_VALIDATOR).build();
    public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder().name("Permissions umask").description("A umask represented as an octal number which determines the permissions of files written to HDFS. This overrides the Hadoop property \"fs.permissions.umask-mode\".  If this property and \"fs.permissions.umask-mode\" are undefined, the Hadoop default \"022\" will be used.  If the PutHDFS target folder has a default ACL defined, the umask property is ignored by HDFS.").addValidator(HadoopValidators.UMASK_VALIDATOR).build();
    public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder().name("Remote Owner").description("Changes the owner of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change owner").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder().name("Remote Group").description("Changes the group of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change group").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor IGNORE_LOCALITY = new PropertyDescriptor.Builder().name("Ignore Locality").displayName("Ignore Locality").description("Directs the HDFS system to ignore locality rules so that data is distributed randomly throughout the cluster").required(false).defaultValue("false").allowableValues(new String[]{"true", "false"}).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList(this.properties);
        arrayList.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(DIRECTORY).description("The parent HDFS directory to which files should be written. The directory will be created if it doesn't exist.").build());
        arrayList.add(CONFLICT_RESOLUTION);
        arrayList.add(WRITING_STRATEGY);
        arrayList.add(BLOCK_SIZE);
        arrayList.add(BUFFER_SIZE);
        arrayList.add(REPLICATION_FACTOR);
        arrayList.add(UMASK);
        arrayList.add(REMOTE_OWNER);
        arrayList.add(REMOTE_GROUP);
        arrayList.add(COMPRESSION_CODEC);
        arrayList.add(IGNORE_LOCALITY);
        return arrayList;
    }

    protected void preProcessConfiguration(Configuration configuration, ProcessContext processContext) {
        PropertyValue property = processContext.getProperty(UMASK);
        FsPermission.setUMask(configuration, new FsPermission(property.isSet() ? Short.parseShort(property.getValue(), 8) : FsPermission.getUMask(configuration).toShort()));
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        this.aclCache = Caffeine.newBuilder().maximumSize(20L).expireAfterWrite(Duration.ofHours(1L)).build();
    }

    @OnStopped
    public void onStopped() {
        if (this.aclCache != null) {
            this.aclCache.invalidateAll();
        }
    }

    public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) throws ProcessException {
        final FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        final FileSystem fileSystem = getFileSystem();
        final Configuration configuration = getConfiguration();
        UserGroupInformation userGroupInformation = getUserGroupInformation();
        if (configuration != null && fileSystem != null && userGroupInformation != null) {
            userGroupInformation.doAs(new PrivilegedAction<Object>() { // from class: org.apache.nifi.processors.hadoop.PutHDFS.1
                @Override // java.security.PrivilegedAction
                public Object run() {
                    FileStatus fileStatus;
                    FlowFile flowFile2 = flowFile;
                    try {
                        String value = processContext.getProperty(PutHDFS.WRITING_STRATEGY).getValue();
                        Path normalizedPath = PutHDFS.this.getNormalizedPath(processContext, AbstractHadoopProcessor.DIRECTORY, flowFile2);
                        final String value2 = processContext.getProperty(PutHDFS.CONFLICT_RESOLUTION).getValue();
                        final long blockSize = PutHDFS.this.getBlockSize(processContext, processSession, flowFile2, normalizedPath);
                        final int bufferSize = PutHDFS.this.getBufferSize(processContext, processSession, flowFile2);
                        final short replication = PutHDFS.this.getReplication(processContext, processSession, flowFile2, normalizedPath);
                        final CompressionCodec compressionCodec = PutHDFS.this.getCompressionCodec(processContext, configuration);
                        String attribute = compressionCodec != null ? flowFile2.getAttribute(CoreAttributes.FILENAME.key()) + compressionCodec.getDefaultExtension() : flowFile2.getAttribute(CoreAttributes.FILENAME.key());
                        Path path = new Path(normalizedPath, "." + attribute);
                        final Path path2 = new Path(normalizedPath, attribute);
                        final Path path3 = value.equals(PutHDFS.WRITE_AND_RENAME) ? path : path2;
                        boolean z = false;
                        try {
                            fileStatus = fileSystem.getFileStatus(normalizedPath);
                        } catch (FileNotFoundException e) {
                            z = fileSystem.mkdirs(normalizedPath);
                            if (!z) {
                                throw new IOException(normalizedPath.toString() + " could not be created");
                            }
                            if (fileSystem.getFileStatus(normalizedPath).hasAcl()) {
                                checkAclStatus(getAclStatus(normalizedPath));
                            }
                            PutHDFS.this.changeOwner(processContext, fileSystem, normalizedPath, flowFile);
                        }
                        if (!fileStatus.isDirectory()) {
                            throw new IOException(normalizedPath.toString() + " already exists and is not a directory");
                        }
                        if (fileStatus.hasAcl()) {
                            checkAclStatus(getAclStatus(normalizedPath));
                        }
                        final boolean exists = fileSystem.exists(path2);
                        if (exists) {
                            boolean z2 = -1;
                            switch (value2.hashCode()) {
                                case -1190396462:
                                    if (value2.equals("ignore")) {
                                        z2 = true;
                                        break;
                                    }
                                    break;
                                case 3135262:
                                    if (value2.equals("fail")) {
                                        z2 = 2;
                                        break;
                                    }
                                    break;
                                case 1094496948:
                                    if (value2.equals("replace")) {
                                        z2 = false;
                                        break;
                                    }
                                    break;
                            }
                            switch (z2) {
                                case false:
                                    if (fileSystem.delete(path2, false)) {
                                        PutHDFS.this.getLogger().info("deleted {} in order to replace with the contents of {}", new Object[]{path2, flowFile2});
                                        break;
                                    }
                                    break;
                                case true:
                                    processSession.transfer(flowFile2, PutHDFS.this.getSuccessRelationship());
                                    PutHDFS.this.getLogger().info("transferring {} to success because file with same name already exists", new Object[]{flowFile2});
                                    return null;
                                case true:
                                    processSession.transfer(processSession.penalize(flowFile2), PutHDFS.this.getFailureRelationship());
                                    PutHDFS.this.getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{flowFile2});
                                    return null;
                            }
                        }
                        StopWatch stopWatch = new StopWatch(true);
                        processSession.read(flowFile2, new InputStreamCallback() { // from class: org.apache.nifi.processors.hadoop.PutHDFS.1.1
                            /* JADX WARN: Finally extract failed */
                            public void process(InputStream inputStream) throws IOException {
                                FSDataOutputStream fSDataOutputStream = null;
                                Path path4 = null;
                                try {
                                    if (value2.equals(PutHDFS.APPEND_RESOLUTION) && exists) {
                                        fSDataOutputStream = fileSystem.append(path2, bufferSize);
                                    } else {
                                        EnumSet of = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
                                        if (PutHDFS.this.shouldIgnoreLocality(processContext, processSession)) {
                                            of.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
                                        }
                                        fSDataOutputStream = fileSystem.create(path3, FsCreateModes.applyUMask(FsPermission.getFileDefault(), FsPermission.getUMask(fileSystem.getConf())), of, bufferSize, replication, blockSize, (Progressable) null, (Options.ChecksumOpt) null);
                                    }
                                    if (compressionCodec != null) {
                                        fSDataOutputStream = compressionCodec.createOutputStream(fSDataOutputStream);
                                    }
                                    path4 = path3;
                                    StreamUtils.copy(new BufferedInputStream(inputStream), fSDataOutputStream);
                                    fSDataOutputStream.flush();
                                    if (fSDataOutputStream != null) {
                                        try {
                                            fSDataOutputStream.close();
                                        } catch (Throwable th) {
                                            if (path4 != null) {
                                                try {
                                                    fileSystem.delete(path4, false);
                                                } catch (Throwable th2) {
                                                }
                                            }
                                            throw th;
                                        }
                                    }
                                } catch (Throwable th3) {
                                    if (fSDataOutputStream != null) {
                                        try {
                                            fSDataOutputStream.close();
                                        } catch (Throwable th4) {
                                            if (path4 != null) {
                                                try {
                                                    fileSystem.delete(path4, false);
                                                } catch (Throwable th5) {
                                                }
                                            }
                                            throw th4;
                                        }
                                    }
                                    throw th3;
                                }
                            }
                        });
                        stopWatch.stop();
                        String calculateDataRate = stopWatch.calculateDataRate(flowFile2.getSize());
                        long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                        if (value.equals(PutHDFS.WRITE_AND_RENAME) && (!value2.equals(PutHDFS.APPEND_RESOLUTION) || (value2.equals(PutHDFS.APPEND_RESOLUTION) && !exists))) {
                            boolean z3 = false;
                            int i = 0;
                            while (true) {
                                if (i < 10) {
                                    if (fileSystem.rename(path, path2)) {
                                        z3 = true;
                                    } else {
                                        Thread.sleep(200L);
                                        i++;
                                    }
                                }
                            }
                            if (!z3) {
                                fileSystem.delete(path, false);
                                throw new ProcessException("Copied file to HDFS but could not rename dot file " + path + " to its final filename");
                            }
                            PutHDFS.this.changeOwner(processContext, fileSystem, path2, flowFile);
                        }
                        PutHDFS.this.getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", new Object[]{flowFile2, path2, Long.valueOf(duration), calculateDataRate});
                        FlowFile putAttribute = processSession.putAttribute(processSession.putAttribute(processSession.putAttribute(flowFile2, CoreAttributes.FILENAME.key(), path2.getName()), MoveHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE, path2.getParent().toString()), "target.dir.created", String.valueOf(z));
                        processSession.getProvenanceReporter().send(putAttribute, path2.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory()).toString());
                        processSession.transfer(putAttribute, PutHDFS.this.getSuccessRelationship());
                        return null;
                    } catch (IOException e2) {
                        Optional findCause = PutHDFS.this.findCause(e2, GSSException.class, gSSException -> {
                            return 13 == gSSException.getMajor();
                        });
                        if (findCause.isPresent()) {
                            PutHDFS.this.getLogger().warn("An error occurred while connecting to HDFS. Rolling back session, and penalizing flow file {}", new Object[]{flowFile2.getAttribute(CoreAttributes.UUID.key()), findCause.get()});
                            processSession.rollback(true);
                            return null;
                        }
                        PutHDFS.this.getLogger().error("Failed to access HDFS due to {}", new Object[]{e2});
                        processSession.transfer(flowFile2, PutHDFS.this.getFailureRelationship());
                        return null;
                    } catch (Throwable th) {
                        if (0 != 0) {
                            try {
                                fileSystem.delete((Path) null, false);
                            } catch (Exception e3) {
                                PutHDFS.this.getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{null, e3});
                            }
                        }
                        PutHDFS.this.getLogger().error("Failed to write to HDFS due to {}", new Object[]{th});
                        processSession.transfer(processSession.penalize(flowFile2), PutHDFS.this.getFailureRelationship());
                        processContext.yield();
                        return null;
                    }
                }

                private void checkAclStatus(AclStatus aclStatus) throws IOException {
                    boolean anyMatch = aclStatus.getEntries().stream().anyMatch(aclEntry -> {
                        return AclEntryScope.DEFAULT.equals(aclEntry.getScope());
                    });
                    boolean isSet = processContext.getProperty(PutHDFS.UMASK).isSet();
                    if (anyMatch && isSet) {
                        throw new IOException("PutHDFS umask setting is ignored by HDFS when HDFS default ACL is set.");
                    }
                }

                private AclStatus getAclStatus(Path path) {
                    Cache cache = PutHDFS.this.aclCache;
                    FileSystem fileSystem2 = fileSystem;
                    return (AclStatus) cache.get(path, path2 -> {
                        try {
                            return fileSystem2.getAclStatus(path);
                        } catch (IOException e) {
                            throw new UncheckedIOException(String.format("Unable to query ACL for directory [%s]", path), e);
                        }
                    });
                }
            });
            return;
        }
        getLogger().error("HDFS not configured properly");
        processSession.transfer(flowFile, getFailureRelationship());
        processContext.yield();
    }

    protected Relationship getSuccessRelationship() {
        return REL_SUCCESS;
    }

    protected Relationship getFailureRelationship() {
        return REL_FAILURE;
    }

    protected long getBlockSize(ProcessContext processContext, ProcessSession processSession, FlowFile flowFile, Path path) {
        Double asDataSize = processContext.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
        return asDataSize != null ? asDataSize.longValue() : getFileSystem().getDefaultBlockSize(path);
    }

    protected int getBufferSize(ProcessContext processContext, ProcessSession processSession, FlowFile flowFile) {
        Double asDataSize = processContext.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
        return asDataSize != null ? asDataSize.intValue() : getConfiguration().getInt("io.file.buffer.size", 4096);
    }

    protected short getReplication(ProcessContext processContext, ProcessSession processSession, FlowFile flowFile, Path path) {
        Integer asInteger = processContext.getProperty(REPLICATION_FACTOR).asInteger();
        return asInteger != null ? asInteger.shortValue() : getFileSystem().getDefaultReplication(path);
    }

    protected boolean shouldIgnoreLocality(ProcessContext processContext, ProcessSession processSession) {
        return processContext.getProperty(IGNORE_LOCALITY).asBoolean().booleanValue();
    }

    protected String getOwner(ProcessContext processContext, FlowFile flowFile) {
        String value = processContext.getProperty(REMOTE_OWNER).evaluateAttributeExpressions(flowFile).getValue();
        if (value == null || value.isEmpty()) {
            return null;
        }
        return value;
    }

    protected String getGroup(ProcessContext processContext, FlowFile flowFile) {
        String value = processContext.getProperty(REMOTE_GROUP).evaluateAttributeExpressions(flowFile).getValue();
        if (value == null || value.isEmpty()) {
            return null;
        }
        return value;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T extends Throwable> Optional<T> findCause(Throwable th, Class<T> cls, Predicate<T> predicate) {
        Stream stream = Throwables.getCausalChain(th).stream();
        cls.getClass();
        Stream filter = stream.filter((v1) -> {
            return r1.isInstance(v1);
        });
        cls.getClass();
        return filter.map((v1) -> {
            return r1.cast(v1);
        }).filter(predicate).findFirst();
    }

    protected void changeOwner(ProcessContext processContext, FileSystem fileSystem, Path path, FlowFile flowFile) {
        try {
            String owner = getOwner(processContext, flowFile);
            String group = getGroup(processContext, flowFile);
            if (owner != null || group != null) {
                fileSystem.setOwner(path, owner, group);
            }
        } catch (Exception e) {
            getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{path, e});
        }
    }

    static {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(hashSet);
    }
}
