package org.apache.nifi.processors.riemann;

import com.aphyr.riemann.Proto;
import com.aphyr.riemann.client.RiemannClient;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@DynamicProperty(name = "Custom Event Attribute", supportsExpressionLanguage = true, description = "These values will be attached to the Riemann event as a custom attribute", value = "Any value or expression")
@CapabilityDescription("Send events to Riemann (http://riemann.io) when FlowFiles pass through this processor. You can use events to notify Riemann that a FlowFile passed through, or you can attach a more meaningful metric, such as, the time a FlowFile took to get to this processor. All attributes attached to events support the NiFi Expression Language.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"riemann", "monitoring", "metrics"})
@SupportsBatching
/* loaded from: input_file:org/apache/nifi/processors/riemann/PutRiemann.class */
public class PutRiemann extends AbstractProcessor {
    protected volatile Transport transport;
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Metrics successfully written to Riemann").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Metrics which failed to write to Riemann").build();
    public static final PropertyDescriptor RIEMANN_HOST = new PropertyDescriptor.Builder().name("Riemann Address").description("Hostname of Riemann server").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor RIEMANN_PORT = new PropertyDescriptor.Builder().name("Riemann Port").description("Port that Riemann is listening on").required(true).defaultValue("5555").addValidator(StandardValidators.PORT_VALIDATOR).build();
    public static final PropertyDescriptor TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder().name("Transport Protocol").description("Transport protocol to speak to Riemann in").required(true).allowableValues(new Transport[]{Transport.TCP, Transport.UDP}).defaultValue("TCP").build();
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").description("Batch size for incoming FlowFiles").required(false).defaultValue("100").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor ATTR_SERVICE = new PropertyDescriptor.Builder().name("Service").description("Name of service associated to this event (e.g. FTP File Fetched)").required(false).expressionLanguageSupported(true).addValidator(Validator.VALID).build();
    public static final PropertyDescriptor ATTR_STATE = new PropertyDescriptor.Builder().name("State").description("State of service associated to this event in string form (e.g. ok, warning, foo)").required(false).expressionLanguageSupported(true).addValidator(Validator.VALID).build();
    public static final PropertyDescriptor ATTR_TIME = new PropertyDescriptor.Builder().name("Time").description("Time of event in unix epoch seconds (long), default: (current time)").required(false).expressionLanguageSupported(true).addValidator(Validator.VALID).expressionLanguageSupported(true).build();
    public static final PropertyDescriptor ATTR_HOST = new PropertyDescriptor.Builder().name("Host").description("A hostname associated to this event (e.g. nifi-app1)").required(false).defaultValue("${hostname()}").expressionLanguageSupported(true).addValidator(Validator.VALID).build();
    public static final PropertyDescriptor ATTR_TTL = new PropertyDescriptor.Builder().name("TTL").description("Floating point value in seconds until Riemann considers this event as \"expired\"").required(false).addValidator(Validator.VALID).expressionLanguageSupported(true).build();
    public static final PropertyDescriptor ATTR_METRIC = new PropertyDescriptor.Builder().name("Metric").description("Floating point number associated to this event").required(false).addValidator(Validator.VALID).expressionLanguageSupported(true).build();
    public static final PropertyDescriptor ATTR_DESCRIPTION = new PropertyDescriptor.Builder().name("Description").description("Description associated to the event").required(false).expressionLanguageSupported(true).addValidator(Validator.VALID).build();
    public static final PropertyDescriptor ATTR_TAGS = new PropertyDescriptor.Builder().name("Tags").description("Comma separated list of tags associated to the event").required(false).expressionLanguageSupported(true).addValidator(Validator.VALID).build();
    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder().name("Timeout").description("Timeout in milliseconds when writing events to Riemann").required(true).defaultValue("1000").addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR).build();
    private static final Set<Relationship> RELATIONSHIPS = new HashSet();
    private static final List<PropertyDescriptor> LOCAL_PROPERTIES = new ArrayList();
    protected volatile RiemannClient riemannClient = null;
    private volatile List<PropertyDescriptor> customAttributes = new ArrayList();
    private volatile int batchSize = -1;
    private volatile long writeTimeout = 1000;

    /* loaded from: input_file:org/apache/nifi/processors/riemann/PutRiemann$FlowFileToEvent.class */
    private static class FlowFileToEvent {
        private FlowFileToEvent() {
        }

        protected static Proto.Event fromAttributes(ProcessContext processContext, List<PropertyDescriptor> list, FlowFile flowFile) {
            Proto.Event.Builder newBuilder = Proto.Event.newBuilder();
            PropertyValue evaluateAttributeExpressions = processContext.getProperty(PutRiemann.ATTR_SERVICE).evaluateAttributeExpressions(flowFile);
            if (StringUtils.isNotBlank(evaluateAttributeExpressions.getValue())) {
                newBuilder.setService(evaluateAttributeExpressions.getValue());
            }
            PropertyValue evaluateAttributeExpressions2 = processContext.getProperty(PutRiemann.ATTR_DESCRIPTION).evaluateAttributeExpressions(flowFile);
            if (StringUtils.isNotBlank(evaluateAttributeExpressions2.getValue())) {
                newBuilder.setDescription(evaluateAttributeExpressions2.getValue());
            }
            PropertyValue evaluateAttributeExpressions3 = processContext.getProperty(PutRiemann.ATTR_METRIC).evaluateAttributeExpressions(flowFile);
            if (StringUtils.isNotBlank(evaluateAttributeExpressions3.getValue())) {
                newBuilder.setMetricF(evaluateAttributeExpressions3.asFloat().floatValue());
            }
            PropertyValue evaluateAttributeExpressions4 = processContext.getProperty(PutRiemann.ATTR_TIME).evaluateAttributeExpressions(flowFile);
            if (StringUtils.isNotBlank(evaluateAttributeExpressions4.getValue())) {
                newBuilder.setTime(evaluateAttributeExpressions4.asLong().longValue());
            }
            PropertyValue evaluateAttributeExpressions5 = processContext.getProperty(PutRiemann.ATTR_STATE).evaluateAttributeExpressions(flowFile);
            if (StringUtils.isNotBlank(evaluateAttributeExpressions5.getValue())) {
                newBuilder.setState(evaluateAttributeExpressions5.getValue());
            }
            PropertyValue evaluateAttributeExpressions6 = processContext.getProperty(PutRiemann.ATTR_TTL).evaluateAttributeExpressions(flowFile);
            if (StringUtils.isNotBlank(evaluateAttributeExpressions6.getValue())) {
                newBuilder.setTtl(evaluateAttributeExpressions6.asFloat().floatValue());
            }
            PropertyValue evaluateAttributeExpressions7 = processContext.getProperty(PutRiemann.ATTR_HOST).evaluateAttributeExpressions(flowFile);
            if (StringUtils.isNotBlank(evaluateAttributeExpressions7.getValue())) {
                newBuilder.setHost(evaluateAttributeExpressions7.getValue());
            }
            PropertyValue evaluateAttributeExpressions8 = processContext.getProperty(PutRiemann.ATTR_TAGS).evaluateAttributeExpressions(flowFile);
            if (StringUtils.isNotBlank(evaluateAttributeExpressions8.getValue())) {
                for (String str : evaluateAttributeExpressions8.getValue().split(",")) {
                    newBuilder.addTags(str.trim());
                }
            }
            for (PropertyDescriptor propertyDescriptor : list) {
                PropertyValue evaluateAttributeExpressions9 = processContext.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile);
                if (StringUtils.isNotBlank(evaluateAttributeExpressions9.getValue())) {
                    newBuilder.addAttributes(Proto.Attribute.newBuilder().setKey(propertyDescriptor.getName()).setValue(evaluateAttributeExpressions9.getValue()).build());
                }
            }
            return newBuilder.build();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/nifi/processors/riemann/PutRiemann$Transport.class */
    public enum Transport {
        TCP,
        UDP
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return LOCAL_PROPERTIES;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String str) {
        return new PropertyDescriptor.Builder().name(str).expressionLanguageSupported(true).addValidator(Validator.VALID).required(false).dynamic(true).build();
    }

    @OnStopped
    public final void cleanUpClient() {
        if (this.riemannClient != null) {
            this.riemannClient.close();
        }
        this.riemannClient = null;
        this.batchSize = -1;
        this.customAttributes.clear();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @OnScheduled
    public void onScheduled(ProcessContext processContext) throws ProcessException {
        if (this.batchSize == -1) {
            this.batchSize = processContext.getProperty(BATCH_SIZE).asInteger().intValue();
        }
        if (this.riemannClient == null || !this.riemannClient.isConnected()) {
            this.transport = Transport.valueOf(processContext.getProperty(TRANSPORT_PROTOCOL).getValue());
            String trim = processContext.getProperty(RIEMANN_HOST).getValue().trim();
            int intValue = processContext.getProperty(RIEMANN_PORT).asInteger().intValue();
            this.writeTimeout = processContext.getProperty(TIMEOUT).asLong().longValue();
            RiemannClient riemannClient = null;
            try {
                switch (this.transport) {
                    case TCP:
                        riemannClient = RiemannClient.tcp(trim, intValue);
                        break;
                    case UDP:
                        riemannClient = RiemannClient.udp(trim, intValue);
                        break;
                }
                riemannClient.connect();
                this.riemannClient = riemannClient;
            } catch (IOException e) {
                if (riemannClient != null) {
                    riemannClient.close();
                }
                processContext.yield();
                throw new ProcessException(String.format("Unable to connect to Riemann [%s:%d] (%s)\n%s", trim, Integer.valueOf(intValue), this.transport, e.getMessage()));
            }
        }
        if (this.customAttributes.size() == 0) {
            for (Map.Entry entry : processContext.getProperties().entrySet()) {
                if (!getSupportedPropertyDescriptors().contains(entry.getKey())) {
                    this.customAttributes.add(entry.getKey());
                }
            }
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        if (this.riemannClient == null || !this.riemannClient.isConnected()) {
            cleanUpClient();
            onScheduled(processContext);
        }
        List<FlowFile> list = processSession.get(this.batchSize);
        ArrayList arrayList = new ArrayList(list.size());
        ArrayList arrayList2 = new ArrayList(list.size());
        for (FlowFile flowFile : list) {
            try {
                arrayList2.add(FlowFileToEvent.fromAttributes(processContext, this.customAttributes, flowFile));
                arrayList.add(flowFile);
            } catch (NumberFormatException e) {
                getLogger().warn("Unable to create Riemann event.", e);
                processSession.transfer(flowFile, REL_FAILURE);
            }
        }
        try {
            if (this.transport != Transport.TCP) {
                this.riemannClient.sendEvents(arrayList2);
            } else if (((Proto.Msg) this.riemannClient.sendEvents(arrayList2).deref(this.writeTimeout, TimeUnit.MILLISECONDS)) == null) {
                processContext.yield();
                throw new ProcessException("Timed out writing to Riemann!");
            }
            this.riemannClient.flush();
            processSession.transfer(arrayList, REL_SUCCESS);
            processSession.commit();
        } catch (Exception e2) {
            processContext.yield();
            processSession.transfer(list);
            processSession.commit();
            throw new ProcessException("Failed writing to Riemann\n" + e2.getMessage());
        }
    }

    static {
        RELATIONSHIPS.add(REL_SUCCESS);
        RELATIONSHIPS.add(REL_FAILURE);
        LOCAL_PROPERTIES.add(RIEMANN_HOST);
        LOCAL_PROPERTIES.add(RIEMANN_PORT);
        LOCAL_PROPERTIES.add(TRANSPORT_PROTOCOL);
        LOCAL_PROPERTIES.add(TIMEOUT);
        LOCAL_PROPERTIES.add(BATCH_SIZE);
        LOCAL_PROPERTIES.add(ATTR_DESCRIPTION);
        LOCAL_PROPERTIES.add(ATTR_SERVICE);
        LOCAL_PROPERTIES.add(ATTR_STATE);
        LOCAL_PROPERTIES.add(ATTR_METRIC);
        LOCAL_PROPERTIES.add(ATTR_TTL);
        LOCAL_PROPERTIES.add(ATTR_TAGS);
        LOCAL_PROPERTIES.add(ATTR_HOST);
        LOCAL_PROPERTIES.add(ATTR_TIME);
    }
}
