package org.apache.nifi.processors.hadoop;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;

@CapabilityDescription("Write FlowFile data to Hadoop Distributed File System (HDFS)")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "HDFS", "put", "copy", "filesystem"})
@WritesAttributes({@WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS is stored in this attribute."), @WritesAttribute(attribute = PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE, description = "The absolute path to the file on HDFS is stored in this attribute.")})
@SeeAlso({GetHDFS.class})
@ReadsAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.")
/* loaded from: input_file:org/apache/nifi/processors/hadoop/PutHDFS.class */
public class PutHDFS extends AbstractHadoopProcessor {
    public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
    public static final int BUFFER_SIZE_DEFAULT = 4096;
    public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
    private static final Set<Relationship> relationships;
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Files that have been successfully written to HDFS are transferred to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Files that could not be written to HDFS for some reason are transferred to this relationship").build();
    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder().name(AbstractHadoopProcessor.DIRECTORY_PROP_NAME).description("The parent HDFS directory to which files should be written").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).build();
    public static final String FAIL_RESOLUTION = "fail";
    public static final String REPLACE_RESOLUTION = "replace";
    public static final String IGNORE_RESOLUTION = "ignore";
    public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder().name("Conflict Resolution Strategy").description("Indicates what should happen when a file with the same name already exists in the output directory").required(true).defaultValue(FAIL_RESOLUTION).allowableValues(new String[]{REPLACE_RESOLUTION, IGNORE_RESOLUTION, FAIL_RESOLUTION}).build();
    public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder().name("Block Size").description("Size of each block as written to HDFS. This overrides the Hadoop Configuration").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder().name("IO Buffer Size").description("Amount of memory to use to buffer file contents during IO. This overrides the Hadoop Configuration").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    public static final PropertyDescriptor REPLICATION_FACTOR = new PropertyDescriptor.Builder().name("Replication").description("Number of times that HDFS will replicate each file. This overrides the Hadoop Configuration").addValidator(createPositiveShortValidator()).build();
    public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder().name("Permissions umask").description("A umask represented as an octal number which determines the permissions of files written to HDFS. This overrides the Hadoop Configuration dfs.umaskmode").addValidator(createUmaskValidator()).build();
    public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder().name("Remote Owner").description("Changes the owner of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change owner").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder().name("Remote Group").description("Changes the group of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change group").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @Override // org.apache.nifi.processors.hadoop.AbstractHadoopProcessor
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList(this.properties);
        arrayList.add(DIRECTORY);
        arrayList.add(CONFLICT_RESOLUTION);
        arrayList.add(BLOCK_SIZE);
        arrayList.add(BUFFER_SIZE);
        arrayList.add(REPLICATION_FACTOR);
        arrayList.add(UMASK);
        arrayList.add(REMOTE_OWNER);
        arrayList.add(REMOTE_GROUP);
        arrayList.add(COMPRESSION_CODEC);
        return arrayList;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) throws Exception {
        super.abstractOnScheduled(processContext);
        PropertyValue property = processContext.getProperty(UMASK);
        FsPermission.setUMask(getConfiguration(), new FsPermission(property.isSet() ? Short.parseShort(property.getValue(), 8) : (short) 18));
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        Configuration configuration = getConfiguration();
        final FileSystem fileSystem = getFileSystem();
        if (configuration == null || fileSystem == null) {
            getLogger().error("HDFS not configured properly");
            processSession.transfer(flowFile, REL_FAILURE);
            processContext.yield();
            return;
        }
        Path path = new Path(processContext.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue());
        String value = processContext.getProperty(CONFLICT_RESOLUTION).getValue();
        Double asDataSize = processContext.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
        final long longValue = asDataSize != null ? asDataSize.longValue() : fileSystem.getDefaultBlockSize(path);
        Double asDataSize2 = processContext.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
        final int intValue = asDataSize2 != null ? asDataSize2.intValue() : configuration.getInt("io.file.buffer.size", 4096);
        Integer asInteger = processContext.getProperty(REPLICATION_FACTOR).asInteger();
        final short shortValue = asInteger != null ? asInteger.shortValue() : fileSystem.getDefaultReplication(path);
        final CompressionCodec compressionCodec = getCompressionCodec(processContext, configuration);
        String attribute = compressionCodec != null ? flowFile.getAttribute(CoreAttributes.FILENAME.key()) + compressionCodec.getDefaultExtension() : flowFile.getAttribute(CoreAttributes.FILENAME.key());
        try {
            final Path path2 = new Path(path, "." + attribute);
            Path path3 = new Path(path, attribute);
            try {
            } catch (FileNotFoundException e) {
                if (!fileSystem.mkdirs(path)) {
                    throw new IOException(path.toString() + " could not be created");
                }
                changeOwner(processContext, fileSystem, path);
            }
            if (!fileSystem.getFileStatus(path).isDirectory()) {
                throw new IOException(path.toString() + " already exists and is not a directory");
            }
            if (fileSystem.exists(path3)) {
                boolean z = -1;
                switch (value.hashCode()) {
                    case -1190396462:
                        if (value.equals(IGNORE_RESOLUTION)) {
                            z = true;
                            break;
                        }
                        break;
                    case 3135262:
                        if (value.equals(FAIL_RESOLUTION)) {
                            z = 2;
                            break;
                        }
                        break;
                    case 1094496948:
                        if (value.equals(REPLACE_RESOLUTION)) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (fileSystem.delete(path3, false)) {
                            getLogger().info("deleted {} in order to replace with the contents of {}", new Object[]{path3, flowFile});
                            break;
                        }
                        break;
                    case true:
                        processSession.transfer(flowFile, REL_SUCCESS);
                        getLogger().info("transferring {} to success because file with same name already exists", new Object[]{flowFile});
                        return;
                    case true:
                        FlowFile penalize = processSession.penalize(flowFile);
                        processSession.transfer(penalize, REL_FAILURE);
                        getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{penalize});
                        return;
                }
            }
            StopWatch stopWatch = new StopWatch(true);
            processSession.read(flowFile, new InputStreamCallback() { // from class: org.apache.nifi.processors.hadoop.PutHDFS.1
                public void process(InputStream inputStream) throws IOException {
                    OutputStream outputStream = null;
                    Path path4 = null;
                    try {
                        outputStream = fileSystem.create(path2, true, intValue, shortValue, longValue);
                        if (compressionCodec != null) {
                            outputStream = compressionCodec.createOutputStream(outputStream);
                        }
                        path4 = path2;
                        StreamUtils.copy(new BufferedInputStream(inputStream), outputStream);
                        outputStream.flush();
                        if (outputStream != null) {
                            try {
                                outputStream.close();
                            } catch (RemoteException e2) {
                                if (path4 != null) {
                                    try {
                                        fileSystem.delete(path4, false);
                                    } catch (Throwable th) {
                                    }
                                }
                                throw e2;
                            } catch (Throwable th2) {
                            }
                        }
                    } catch (Throwable th3) {
                        if (outputStream != null) {
                            try {
                                outputStream.close();
                            } catch (RemoteException e3) {
                                if (path4 != null) {
                                    try {
                                        fileSystem.delete(path4, false);
                                    } catch (Throwable th4) {
                                    }
                                }
                                throw e3;
                            } catch (Throwable th5) {
                                throw th3;
                            }
                        }
                        throw th3;
                    }
                }
            });
            stopWatch.stop();
            String calculateDataRate = stopWatch.calculateDataRate(flowFile.getSize());
            long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
            boolean z2 = false;
            int i = 0;
            while (true) {
                if (i < 10) {
                    if (fileSystem.rename(path2, path3)) {
                        z2 = true;
                    } else {
                        Thread.sleep(200L);
                        i++;
                    }
                }
            }
            if (!z2) {
                fileSystem.delete(path2, false);
                throw new ProcessException("Copied file to HDFS but could not rename dot file " + path2 + " to its final filename");
            }
            changeOwner(processContext, fileSystem, path3);
            getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", new Object[]{flowFile, path3, Long.valueOf(duration), calculateDataRate});
            String path4 = path3.toString();
            FlowFile putAttribute = processSession.putAttribute(processSession.putAttribute(flowFile, CoreAttributes.FILENAME.key(), path3.getName()), ABSOLUTE_HDFS_PATH_ATTRIBUTE, path3.getParent().toString());
            processSession.getProvenanceReporter().send(putAttribute, path4.startsWith("/") ? "hdfs:/" + path4 : "hdfs://" + path4);
            processSession.transfer(putAttribute, REL_SUCCESS);
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    fileSystem.delete((Path) null, false);
                } catch (Exception e2) {
                    getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{null, e2});
                }
            }
            getLogger().error("Failed to write to HDFS due to {}", new Object[]{th});
            processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
            processContext.yield();
        }
    }

    protected void changeOwner(ProcessContext processContext, FileSystem fileSystem, Path path) {
        try {
            String value = processContext.getProperty(REMOTE_OWNER).getValue();
            String value2 = processContext.getProperty(REMOTE_GROUP).getValue();
            if (value != null || value2 != null) {
                fileSystem.setOwner(path, value, value2);
            }
        } catch (Exception e) {
            getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{path, e});
        }
    }

    static Validator createPositiveShortValidator() {
        return new Validator() { // from class: org.apache.nifi.processors.hadoop.PutHDFS.2
            public ValidationResult validate(String str, String str2, ValidationContext validationContext) {
                String str3 = null;
                try {
                    if (Short.parseShort(str2) <= 0) {
                        str3 = "short integer must be greater than zero";
                    }
                } catch (NumberFormatException e) {
                    str3 = "[" + str2 + "] is not a valid short integer";
                }
                return new ValidationResult.Builder().subject(str).input(str2).explanation(str3).valid(str3 == null).build();
            }
        };
    }

    static Validator createUmaskValidator() {
        return new Validator() { // from class: org.apache.nifi.processors.hadoop.PutHDFS.3
            public ValidationResult validate(String str, String str2, ValidationContext validationContext) {
                String str3 = null;
                try {
                    short parseShort = Short.parseShort(str2, 8);
                    if (parseShort < 0) {
                        str3 = "octal umask [" + str2 + "] cannot be negative";
                    } else if (parseShort > 511) {
                        str3 = "octal umask [" + str2 + "] is not a valid umask";
                    }
                } catch (NumberFormatException e) {
                    str3 = "[" + str2 + "] is not a valid short octal number";
                }
                return new ValidationResult.Builder().subject(str).input(str2).explanation(str3).valid(str3 == null).build();
            }
        };
    }

    static {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(hashSet);
    }
}
