package org.apache.nifi.processors.hadoop;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.exception.FailureException;
import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StopWatch;

@DefaultSettings(yieldDuration = "100 ms")
@TriggerWhenEmpty
/* loaded from: input_file:org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.class */
public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
    public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder().name("compression-type").displayName("Compression Type").description("The type of compression for the file being written.").required(true).build();
    public static final PropertyDescriptor OVERWRITE = new PropertyDescriptor.Builder().name("overwrite").displayName("Overwrite Files").description("Whether or not to overwrite existing files in the same directory with the same name. When set to false, flow files will be routed to failure when a file exists in the same directory with the same name.").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).build();
    public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder().name("permissions-umask").displayName("Permissions umask").description("A umask represented as an octal number which determines the permissions of files written to HDFS. This overrides the Hadoop Configuration dfs.umaskmode").addValidator(HadoopValidators.UMASK_VALIDATOR).build();
    public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder().name("remote-owner").displayName("Remote Owner").description("Changes the owner of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change owner").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder().name("remote-group").displayName("Remote Group").description("Changes the group of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change group").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("The service for reading records from incoming flow files.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Flow Files that have been successfully processed are transferred to this relationship").build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("Flow Files that could not be processed due to issues that can be retried are transferred to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Flow Files that could not be processed due to issue that cannot be retried are transferred to this relationship").build();
    public static final String RECORD_COUNT_ATTR = "record.count";
    private volatile String remoteOwner;
    private volatile String remoteGroup;
    private volatile Set<Relationship> putHdfsRecordRelationships;
    private volatile List<PropertyDescriptor> putHdfsRecordProperties;

    protected final void init(ProcessorInitializationContext processorInitializationContext) {
        super.init(processorInitializationContext);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_RETRY);
        hashSet.add(REL_FAILURE);
        this.putHdfsRecordRelationships = Collections.unmodifiableSet(hashSet);
        ArrayList arrayList = new ArrayList(this.properties);
        arrayList.add(RECORD_READER);
        arrayList.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(DIRECTORY).description("The parent directory to which files should be written. Will be created if it doesn't exist.").build());
        arrayList.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(COMPRESSION_TYPE).allowableValues((AllowableValue[]) getCompressionTypes(processorInitializationContext).toArray(new AllowableValue[0])).defaultValue(getDefaultCompressionType(processorInitializationContext)).build());
        arrayList.add(OVERWRITE);
        arrayList.add(UMASK);
        arrayList.add(REMOTE_GROUP);
        arrayList.add(REMOTE_OWNER);
        arrayList.addAll(getAdditionalProperties());
        this.putHdfsRecordProperties = Collections.unmodifiableList(arrayList);
    }

    public abstract List<AllowableValue> getCompressionTypes(ProcessorInitializationContext processorInitializationContext);

    public abstract String getDefaultCompressionType(ProcessorInitializationContext processorInitializationContext);

    public List<PropertyDescriptor> getAdditionalProperties() {
        return Collections.emptyList();
    }

    public final Set<Relationship> getRelationships() {
        return this.putHdfsRecordRelationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.putHdfsRecordProperties;
    }

    protected void preProcessConfiguration(Configuration configuration, ProcessContext processContext) {
        PropertyValue property = processContext.getProperty(UMASK);
        FsPermission.setUMask(configuration, new FsPermission(property.isSet() ? Short.parseShort(property.getValue(), 8) : (short) 18));
    }

    @OnScheduled
    public final void onScheduled(ProcessContext processContext) throws IOException {
        super.abstractOnScheduled(processContext);
        this.remoteOwner = processContext.getProperty(REMOTE_OWNER).getValue();
        this.remoteGroup = processContext.getProperty(REMOTE_GROUP).getValue();
    }

    public abstract HDFSRecordWriter createHDFSRecordWriter(ProcessContext processContext, FlowFile flowFile, Configuration configuration, Path path, RecordSchema recordSchema) throws IOException, SchemaNotFoundException;

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FileSystem fileSystem = getFileSystem();
        Configuration configuration = getConfiguration();
        UserGroupInformation userGroupInformation = getUserGroupInformation();
        if (configuration == null || fileSystem == null || userGroupInformation == null) {
            getLogger().error("Processor not configured properly because Configuration, FileSystem, or UserGroupInformation was null");
            processContext.yield();
            return;
        }
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            processContext.yield();
        } else {
            userGroupInformation.doAs(() -> {
                try {
                    try {
                        String attribute = flowFile.getAttribute(CoreAttributes.FILENAME.key());
                        Path normalizedPath = getNormalizedPath(processContext, DIRECTORY, flowFile);
                        createDirectory(fileSystem, normalizedPath, this.remoteOwner, this.remoteGroup);
                        Path path = new Path(normalizedPath, "." + attribute) { // from class: org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord.1
                            public FileSystem getFileSystem(Configuration configuration2) throws IOException {
                                return fileSystem;
                            }
                        };
                        Path path2 = new Path(normalizedPath, attribute) { // from class: org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord.2
                            public FileSystem getFileSystem(Configuration configuration2) throws IOException {
                                return fileSystem;
                            }
                        };
                        boolean z = fileSystem.exists(path2) || fileSystem.exists(path);
                        boolean booleanValue = processContext.getProperty(OVERWRITE).asBoolean().booleanValue();
                        if (z && !booleanValue) {
                            processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
                            getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{flowFile});
                            return null;
                        }
                        AtomicReference atomicReference = new AtomicReference(null);
                        AtomicReference atomicReference2 = new AtomicReference();
                        RecordReaderFactory asControllerService = processContext.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
                        StopWatch stopWatch = new StopWatch(true);
                        processSession.read(flowFile, inputStream -> {
                            RecordReader recordReader = null;
                            try {
                                try {
                                    try {
                                        recordReader = asControllerService.createRecordReader(flowFile, inputStream, getLogger());
                                        RecordSet createRecordSet = recordReader.createRecordSet();
                                        HDFSRecordWriter createHDFSRecordWriter = createHDFSRecordWriter(processContext, flowFile, configuration, path, recordReader.getSchema());
                                        atomicReference2.set(createHDFSRecordWriter.write(createRecordSet));
                                        IOUtils.closeQuietly(recordReader);
                                        IOUtils.closeQuietly(createHDFSRecordWriter);
                                    } catch (Throwable th) {
                                        IOUtils.closeQuietly((Closeable) null);
                                        IOUtils.closeQuietly((Closeable) null);
                                        throw th;
                                    }
                                } catch (Exception e) {
                                    atomicReference.set(new RecordReaderFactoryException("Unable to create RecordReader", e));
                                    IOUtils.closeQuietly(recordReader);
                                    IOUtils.closeQuietly((Closeable) null);
                                }
                            } catch (Exception e2) {
                                atomicReference.set(e2);
                                IOUtils.closeQuietly((Closeable) null);
                                IOUtils.closeQuietly((Closeable) null);
                            }
                        });
                        stopWatch.stop();
                        String calculateDataRate = stopWatch.calculateDataRate(flowFile.getSize());
                        long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                        if (atomicReference.get() != null) {
                            throw ((Throwable) atomicReference.get());
                        }
                        if (fileSystem.exists(path2) && booleanValue && fileSystem.delete(path2, false)) {
                            getLogger().info("deleted {} in order to replace with the contents of {}", new Object[]{path2, flowFile});
                        }
                        rename(fileSystem, path, path2);
                        changeOwner(fileSystem, path2, this.remoteOwner, this.remoteGroup);
                        getLogger().info("Wrote {} to {} in {} milliseconds at a rate of {}", new Object[]{flowFile, path2, Long.valueOf(duration), calculateDataRate});
                        FlowFile postProcess = postProcess(processContext, processSession, flowFile, path2);
                        String name = path2.getName();
                        String path3 = path2.getParent().toString();
                        HashMap hashMap = new HashMap(((WriteResult) atomicReference2.get()).getAttributes());
                        hashMap.put(CoreAttributes.FILENAME.key(), name);
                        hashMap.put("absolute.hdfs.path", path3);
                        hashMap.put("record.count", String.valueOf(((WriteResult) atomicReference2.get()).getRecordCount()));
                        FlowFile putAllAttributes = processSession.putAllAttributes(postProcess, hashMap);
                        Path makeQualified = path2.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
                        FlowFile putAttribute = processSession.putAttribute(putAllAttributes, "hadoop.file.url", makeQualified.toString());
                        processSession.getProvenanceReporter().send(putAttribute, makeQualified.toString());
                        processSession.transfer(putAttribute, REL_SUCCESS);
                        return null;
                    } catch (IOException | FlowFileAccessException e) {
                        deleteQuietly(fileSystem, null);
                        getLogger().error("Failed to write due to {}", new Object[]{e});
                        processSession.transfer(processSession.penalize(flowFile), REL_RETRY);
                        processContext.yield();
                        return null;
                    }
                } catch (Throwable th) {
                    deleteQuietly(fileSystem, null);
                    getLogger().error("Failed to write due to {}", new Object[]{th});
                    processSession.transfer(flowFile, REL_FAILURE);
                    return null;
                }
            });
        }
    }

    protected FlowFile postProcess(ProcessContext processContext, ProcessSession processSession, FlowFile flowFile, Path path) {
        return flowFile;
    }

    protected void rename(FileSystem fileSystem, Path path, Path path2) throws IOException, InterruptedException, FailureException {
        boolean z = false;
        int i = 0;
        while (true) {
            if (i >= 10) {
                break;
            }
            if (fileSystem.rename(path, path2)) {
                z = true;
                break;
            } else {
                Thread.sleep(200L);
                i++;
            }
        }
        if (z) {
            return;
        }
        fileSystem.delete(path, false);
        throw new FailureException("Could not rename file " + String.valueOf(path) + " to its final filename");
    }

    protected void deleteQuietly(FileSystem fileSystem, Path path) {
        if (path != null) {
            try {
                fileSystem.delete(path, false);
            } catch (Exception e) {
                getLogger().error("Unable to remove file {} due to {}", new Object[]{path, e});
            }
        }
    }

    protected void changeOwner(FileSystem fileSystem, Path path, String str, String str2) {
        if (str != null || str2 != null) {
            try {
                fileSystem.setOwner(path, str, str2);
            } catch (Exception e) {
                getLogger().warn("Could not change owner or group of {} on due to {}", new Object[]{path, e});
            }
        }
    }

    protected void createDirectory(FileSystem fileSystem, Path path, String str, String str2) throws IOException, FailureException {
        try {
            if (fileSystem.getFileStatus(path).isDirectory()) {
            } else {
                throw new FailureException(path.toString() + " already exists and is not a directory");
            }
        } catch (FileNotFoundException e) {
            if (!fileSystem.mkdirs(path)) {
                throw new FailureException(path.toString() + " could not be created");
            }
            changeOwner(fileSystem, path, str, str2);
        }
    }
}
