package org.apache.nifi.processors.azure.eventhub;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.servicebus.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.IllegalConnectionStringFormatException;
import com.microsoft.azure.servicebus.ServiceBusException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;

@CapabilityDescription("Sends the contents of a FlowFile to a Windows Azure Event Hub. Note: the content of the FlowFile will be buffered into memory before being sent, so care should be taken to avoid sending FlowFiles to this Processor that exceed the amount of Java Heap Space available.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@SupportsBatching
@Tags({"microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming"})
/* loaded from: input_file:org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.class */
public class PutAzureEventHub extends AbstractProcessor {
    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder().name("Event Hub Name").description("The name of the Azure Event Hub to send to").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).build();
    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder().name("Event Hub Namespace").description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to <Event Hub Name>-ns").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder().name("Shared Access Policy Name").description("The name of the Event Hub Shared Access Policy. This Policy must have Send permissions.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder().name("Shared Access Policy Primary Key").description("The primary key of the Event Hub Shared Access Policy").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).sensitive(true).required(true).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully sent to the Azure Event Hub will be transferred to this Relationship.").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that could not be sent to the Azure Event Hub will be transferred to this Relationship.").build();
    private volatile BlockingQueue<EventHubClient> senderQueue = new LinkedBlockingQueue();
    private static final List<PropertyDescriptor> propertyDescriptors;
    private static final Set<Relationship> relationships;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    @OnScheduled
    public final void setupClient(ProcessContext processContext) throws ProcessException {
        String value = processContext.getProperty(ACCESS_POLICY).getValue();
        String value2 = processContext.getProperty(POLICY_PRIMARY_KEY).getValue();
        String value3 = processContext.getProperty(NAMESPACE).getValue();
        String value4 = processContext.getProperty(EVENT_HUB_NAME).getValue();
        int maxConcurrentTasks = processContext.getMaxConcurrentTasks();
        this.senderQueue = new LinkedBlockingQueue(maxConcurrentTasks);
        for (int i = 0; i < maxConcurrentTasks; i++) {
            EventHubClient createEventHubClient = createEventHubClient(value3, value4, value, value2);
            if (null != createEventHubClient) {
                this.senderQueue.offer(createEventHubClient);
            }
        }
    }

    @OnStopped
    public void tearDown() {
        while (true) {
            EventHubClient poll = this.senderQueue.poll();
            if (poll == null) {
                return;
            } else {
                poll.close();
            }
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        StopWatch stopWatch = new StopWatch(true);
        byte[] bArr = new byte[(int) flowFile.getSize()];
        processSession.read(flowFile, inputStream -> {
            StreamUtils.fillBuffer(inputStream, bArr);
        });
        try {
            sendMessage(bArr);
            processSession.getProvenanceReporter().send(flowFile, "amqps://" + processContext.getProperty(NAMESPACE).getValue() + ".servicebus.windows.net/" + processContext.getProperty(EVENT_HUB_NAME).getValue(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
            processSession.transfer(flowFile, REL_SUCCESS);
        } catch (ProcessException e) {
            getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[]{flowFile, e}, e);
            processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
        }
    }

    protected EventHubClient createEventHubClient(String str, String str2, String str3, String str4) throws ProcessException {
        try {
            return (EventHubClient) EventHubClient.createFromConnectionString(getConnectionString(str, str2, str3, str4)).get();
        } catch (IOException | InterruptedException | ExecutionException | ServiceBusException | IllegalConnectionStringFormatException e) {
            getLogger().error("Failed to create EventHubClient due to {}", e);
            throw new ProcessException(e);
        }
    }

    protected String getConnectionString(String str, String str2, String str3, String str4) {
        return new ConnectionStringBuilder(str, str2, str3, str4).toString();
    }

    protected void sendMessage(byte[] bArr) throws ProcessException {
        EventHubClient poll = this.senderQueue.poll();
        try {
            if (null == poll) {
                throw new ProcessException("No EventHubClients are configured for sending");
            }
            try {
                poll.sendSync(new EventData(bArr));
                this.senderQueue.offer(poll);
            } catch (ServiceBusException e) {
                throw new ProcessException("Caught exception trying to send message to eventbus", e);
            }
        } catch (Throwable th) {
            this.senderQueue.offer(poll);
            throw th;
        }
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.add(EVENT_HUB_NAME);
        arrayList.add(NAMESPACE);
        arrayList.add(ACCESS_POLICY);
        arrayList.add(POLICY_PRIMARY_KEY);
        propertyDescriptors = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(hashSet);
    }
}
