package org.apache.nifi.processors.hadoop.inotify;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.inotify.MissingEventsException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.AbstractHadoopProcessor;
import org.apache.nifi.processors.hadoop.FetchHDFS;
import org.apache.nifi.processors.hadoop.GetHDFS;
import org.apache.nifi.processors.hadoop.GetHDFSFileInfo;
import org.apache.nifi.processors.hadoop.ListHDFS;
import org.apache.nifi.processors.hadoop.PutHDFS;

@CapabilityDescription("This processor polls the notification events provided by the HdfsAdmin API. Since this uses the HdfsAdmin APIs it is required to run as an HDFS super user. Currently there are six types of events (append, close, create, metadata, rename, and unlink). Please see org.apache.hadoop.hdfs.inotify.Event documentation for full explanations of each event. This processor will poll for new events based on a defined duration. For each event received a new flow file will be created with the expected attributes and the event itself serialized to JSON and written to the flow file's content. For example, if event.type is APPEND then the content of the flow file will contain a JSON file containing the information about the append event. If successful the flow files are sent to the 'success' relationship. Be careful of where the generated flow files are stored. If the flow files are stored in one of processor's watch directories there will be a never ending flow of events. It is also important to be aware that this processor must consume all events. The filtering must happen within the processor. This is because the HDFS admin's event notifications API does not have filtering.")
@WritesAttributes({@WritesAttribute(attribute = "mime.type", description = "This is always application/json."), @WritesAttribute(attribute = "hdfs.inotify.event.type", description = "This will specify the specific HDFS notification event type. Currently there are six types of events (append, close, create, metadata, rename, and unlink)."), @WritesAttribute(attribute = "hdfs.inotify.event.path", description = "The specific path that the event is tied to.")})
@Stateful(scopes = {Scope.CLUSTER}, description = "The last used transaction id is stored. This is used ")
@TriggerWhenEmpty
@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"hadoop", "events", "inotify", "notifications", "filesystem"})
@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class, ListHDFS.class})
/* loaded from: input_file:org/apache/nifi/processors/hadoop/inotify/GetHDFSEvents.class */
public class GetHDFSEvents extends AbstractHadoopProcessor {
    private static final String LAST_TX_ID = "last.tx.id";
    private volatile long lastTxId = -1;
    private NotificationConfig notificationConfig;
    static final PropertyDescriptor POLL_DURATION = new PropertyDescriptor.Builder().name("Poll Duration").displayName("Poll Duration").description("The time before the polling method returns with the next batch of events if they exist. It may exceed this amount of time by up to the time required for an RPC to the NameNode.").defaultValue("1 second").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    static final PropertyDescriptor HDFS_PATH_TO_WATCH = new PropertyDescriptor.Builder().name("HDFS Path to Watch").displayName("HDFS Path to Watch").description("The HDFS path to get event notifications for. This property accepts both expression language and regular expressions. This will be evaluated during the OnScheduled phase.").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true)).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder().name("Ignore Hidden Files").displayName("Ignore Hidden Files").description("If true and the final component of the path associated with a given event starts with a '.' then that event will not be processed.").required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    static final PropertyDescriptor EVENT_TYPES = new PropertyDescriptor.Builder().name("Event Types to Filter On").displayName("Event Types to Filter On").description("A comma-separated list of event types to process. Valid event types are: append, close, create, metadata, rename, and unlink. Case does not matter.").addValidator(new EventTypeValidator()).required(true).defaultValue("append, close, create, metadata, rename, unlink").build();
    static final PropertyDescriptor NUMBER_OF_RETRIES_FOR_POLL = new PropertyDescriptor.Builder().name("IOException Retries During Event Polling").displayName("IOException Retries During Event Polling").description("According to the HDFS admin API for event polling it is good to retry at least a few times. This number defines how many times the poll will be retried if it throws an IOException.").addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).required(true).defaultValue("3").build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("A flow file with updated information about a specific event will be sent to this relationship.").build();
    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet(Collections.singletonList(REL_SUCCESS)));
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.nifi.processors.hadoop.inotify.GetHDFSEvents$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/nifi/processors/hadoop/inotify/GetHDFSEvents$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hadoop$hdfs$inotify$Event$EventType = new int[Event.EventType.values().length];

        static {
            try {
                $SwitchMap$org$apache$hadoop$hdfs$inotify$Event$EventType[Event.EventType.CREATE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hadoop$hdfs$inotify$Event$EventType[Event.EventType.CLOSE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$hadoop$hdfs$inotify$Event$EventType[Event.EventType.APPEND.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$hadoop$hdfs$inotify$Event$EventType[Event.EventType.RENAME.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$hadoop$hdfs$inotify$Event$EventType[Event.EventType.METADATA.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$hadoop$hdfs$inotify$Event$EventType[Event.EventType.UNLINK.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/hadoop/inotify/GetHDFSEvents$NotificationConfig.class */
    public static class NotificationConfig {
        private final PathFilter pathFilter;

        NotificationConfig(ProcessContext processContext) {
            this.pathFilter = new NotificationEventPathFilter(Pattern.compile(processContext.getProperty(GetHDFSEvents.HDFS_PATH_TO_WATCH).evaluateAttributeExpressions().getValue()), processContext.getProperty(GetHDFSEvents.IGNORE_HIDDEN_FILES).asBoolean().booleanValue());
        }

        PathFilter getPathFilter() {
            return this.pathFilter;
        }
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList(this.properties);
        arrayList.add(POLL_DURATION);
        arrayList.add(HDFS_PATH_TO_WATCH);
        arrayList.add(IGNORE_HIDDEN_FILES);
        arrayList.add(EVENT_TYPES);
        arrayList.add(NUMBER_OF_RETRIES_FOR_POLL);
        return Collections.unmodifiableList(arrayList);
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @OnScheduled
    public void onSchedule(ProcessContext processContext) {
        this.notificationConfig = new NotificationConfig(processContext);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        try {
            String str = processSession.getState(Scope.CLUSTER).get(LAST_TX_ID);
            if (str != null && !"".equals(str)) {
                this.lastTxId = Long.parseLong(str);
            }
            try {
                int intValue = processContext.getProperty(NUMBER_OF_RETRIES_FOR_POLL).asInteger().intValue();
                TimeUnit timeUnit = TimeUnit.MICROSECONDS;
                EventBatch eventBatch = getEventBatch(this.lastTxId == -1 ? getHdfsAdmin().getInotifyEventStream() : getHdfsAdmin().getInotifyEventStream(this.lastTxId), processContext.getProperty(POLL_DURATION).asTimePeriod(timeUnit).longValue(), timeUnit, intValue);
                if (eventBatch != null && eventBatch.getEvents() != null) {
                    if (eventBatch.getEvents().length > 0) {
                        ArrayList<FlowFile> arrayList = new ArrayList(eventBatch.getEvents().length);
                        for (final Event event : eventBatch.getEvents()) {
                            if (toProcessEvent(processContext, event)) {
                                getLogger().debug("Creating flow file for event: {}.", new Object[]{event});
                                arrayList.add(processSession.write(processSession.putAttribute(processSession.putAttribute(processSession.putAttribute(processSession.create(), CoreAttributes.MIME_TYPE.key(), GetHDFSFileInfo.APPLICATION_JSON), "hdfs.inotify.event.type", event.getEventType().name()), "hdfs.inotify.event.path", getPath(event)), new OutputStreamCallback() { // from class: org.apache.nifi.processors.hadoop.inotify.GetHDFSEvents.1
                                    public void process(OutputStream outputStream) throws IOException {
                                        outputStream.write(GetHDFSEvents.OBJECT_MAPPER.writeValueAsBytes(event));
                                    }
                                }));
                            }
                        }
                        for (FlowFile flowFile : arrayList) {
                            String attribute = flowFile.getAttribute("hdfs.inotify.event.path");
                            String str2 = attribute.startsWith("/") ? "hdfs:/" + attribute : "hdfs://" + attribute;
                            getLogger().debug("Transferring flow file {} and creating provenance event with URI {}.", new Object[]{flowFile, str2});
                            processSession.transfer(flowFile, REL_SUCCESS);
                            processSession.getProvenanceReporter().receive(flowFile, str2);
                        }
                    }
                    this.lastTxId = eventBatch.getTxid();
                }
            } catch (MissingEventsException e) {
                this.lastTxId = -1L;
                getLogger().error("Unable to get notification information. Setting transaction id to -1. This may cause some events to get missed. Please see javadoc for org.apache.hadoop.hdfs.client.HdfsAdmin#getInotifyEventStream", e);
            } catch (IOException | InterruptedException e2) {
                getLogger().error("Unable to get notification information", e2);
                processContext.yield();
                return;
            }
            updateClusterStateForTxId(processSession);
        } catch (IOException e3) {
            getLogger().error("Unable to retrieve last transaction ID. Must retrieve last processed transaction ID before processing can occur.", e3);
            processContext.yield();
        }
    }

    private EventBatch getEventBatch(DFSInotifyEventInputStream dFSInotifyEventInputStream, long j, TimeUnit timeUnit, int i) throws IOException, InterruptedException, MissingEventsException {
        int i2 = 0;
        while (true) {
            try {
                i2++;
                return dFSInotifyEventInputStream.poll(j, timeUnit);
            } catch (IOException e) {
                if (i2 > i) {
                    getLogger().debug("Failed to poll for event batch. Reached max retry times.", e);
                    throw e;
                }
                getLogger().debug("Attempt {} failed to poll for event batch. Retrying.", new Object[]{Integer.valueOf(i2)});
            }
        }
    }

    private void updateClusterStateForTxId(ProcessSession processSession) {
        try {
            HashMap hashMap = new HashMap(processSession.getState(Scope.CLUSTER).toMap());
            hashMap.put(LAST_TX_ID, String.valueOf(this.lastTxId));
            processSession.setState(hashMap, Scope.CLUSTER);
        } catch (IOException e) {
            getLogger().warn("Failed to update cluster state for last txId. It is possible data replication may occur.", e);
        }
    }

    protected HdfsAdmin getHdfsAdmin() {
        try {
            return new HdfsAdmin(getFileSystem().getUri(), getFileSystem().getConf());
        } catch (IOException e) {
            getLogger().error("Unable to get and instance of HDFS admin. You must be an HDFS super user to view HDFS events.");
            throw new ProcessException(e);
        }
    }

    private boolean toProcessEvent(ProcessContext processContext, Event event) {
        for (String str : processContext.getProperty(EVENT_TYPES).getValue().split(",")) {
            if (str.trim().equalsIgnoreCase(event.getEventType().name())) {
                return this.notificationConfig.getPathFilter().accept(new Path(getPath(event)));
            }
        }
        return false;
    }

    private String getPath(Event event) {
        if (event == null || event.getEventType() == null) {
            throw new IllegalArgumentException("Event and event type must not be null.");
        }
        switch (AnonymousClass2.$SwitchMap$org$apache$hadoop$hdfs$inotify$Event$EventType[event.getEventType().ordinal()]) {
            case 1:
                return ((Event.CreateEvent) event).getPath();
            case 2:
                return ((Event.CloseEvent) event).getPath();
            case 3:
                return ((Event.AppendEvent) event).getPath();
            case 4:
                return ((Event.RenameEvent) event).getSrcPath();
            case 5:
                return ((Event.MetadataUpdateEvent) event).getPath();
            case 6:
                return ((Event.UnlinkEvent) event).getPath();
            default:
                throw new IllegalArgumentException("Unsupported event type.");
        }
    }
}
