package org.apache.nifi.processors.azure.eventhub;

import com.microsoft.eventhubs.client.EventHubClient;
import com.microsoft.eventhubs.client.EventHubException;
import com.microsoft.eventhubs.client.EventHubSender;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;

@CapabilityDescription("Sends the contents of a FlowFile to a Windows Azure Event Hub. Note: the content of the FlowFile will be buffered into memory before being sent, so care should be taken to avoid sending FlowFiles to this Processor that exceed the amount of Java Heap Space available.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@SupportsBatching
@Tags({"microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming"})
/* loaded from: input_file:org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.class */
public class PutAzureEventHub extends AbstractProcessor {
    static final AllowableValue DELIVERY_MODE_PERSISTENT = new AllowableValue(String.valueOf(2), "Persistent", "This mode indicates that the Event Hub server must persist the message to a reliable storage mechanism before the FlowFile is routed to 'success', in order to ensure that the data is not lost.");
    static final AllowableValue DELIVERY_MODE_NON_PERSISTENT = new AllowableValue(String.valueOf(1), "Non-Persistent", "This mode indicates that the Event Hub server does not have to persist the message to a reliable storage mechanism before the FlowFile is routed to 'success'. This delivery mode may offer higher throughput but may result in message loss if the server crashes or is restarted.");
    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder().name("Event Hub Name").description("The name of the Azure Event Hub to send to").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).build();
    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder().name("Event Hub Namespace").description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to <Event Hub Name>-ns").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false).required(true).build();
    static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder().name("Shared Access Policy Name").description("The name of the Event Hub Shared Access Policy. This Policy must have Send permissions.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false).required(true).build();
    static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder().name("Shared Access Policy Primary Key").description("The primary key of the Event Hub Shared Access Policy").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false).sensitive(true).required(true).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully sent to the Azure Event Hub will be transferred to this Relationship.").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that could not be sent to the Azure Event Hub will be transferred to this Relationship.").build();
    private volatile BlockingQueue<EventHubSender> senderQueue = new LinkedBlockingQueue();

    public Set<Relationship> getRelationships() {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        return hashSet;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList(super.getSupportedPropertyDescriptors());
        arrayList.add(EVENT_HUB_NAME);
        arrayList.add(NAMESPACE);
        arrayList.add(ACCESS_POLICY);
        arrayList.add(POLICY_PRIMARY_KEY);
        return arrayList;
    }

    @OnScheduled
    public final void setupClient(ProcessContext processContext) throws EventHubException {
        EventHubClient create = EventHubClient.create(processContext.getProperty(ACCESS_POLICY).getValue(), processContext.getProperty(POLICY_PRIMARY_KEY).getValue(), processContext.getProperty(NAMESPACE).getValue(), processContext.getProperty(EVENT_HUB_NAME).getValue());
        int maxConcurrentTasks = processContext.getMaxConcurrentTasks();
        this.senderQueue = new LinkedBlockingQueue(maxConcurrentTasks);
        for (int i = 0; i < maxConcurrentTasks; i++) {
            this.senderQueue.offer(create.createPartitionSender((String) null));
        }
    }

    @OnStopped
    public void tearDown() {
        while (true) {
            EventHubSender poll = this.senderQueue.poll();
            if (poll == null) {
                return;
            } else {
                poll.close();
            }
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        StopWatch stopWatch = new StopWatch(true);
        EventHubSender poll = this.senderQueue.poll();
        try {
            final byte[] bArr = new byte[(int) flowFile.getSize()];
            processSession.read(flowFile, new InputStreamCallback() { // from class: org.apache.nifi.processors.azure.eventhub.PutAzureEventHub.1
                public void process(InputStream inputStream) throws IOException {
                    StreamUtils.fillBuffer(inputStream, bArr);
                }
            });
            try {
                poll.send(bArr);
                processSession.getProvenanceReporter().send(flowFile, "amqps://" + processContext.getProperty(NAMESPACE).getValue() + ".servicebus.windows.net/" + processContext.getProperty(EVENT_HUB_NAME).getValue(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                processSession.transfer(flowFile, REL_SUCCESS);
                this.senderQueue.offer(poll);
            } catch (EventHubException e) {
                getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[]{flowFile, e}, e);
                processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
                this.senderQueue.offer(poll);
            }
        } catch (Throwable th) {
            this.senderQueue.offer(poll);
            throw th;
        }
    }
}
