package org.apache.nifi.processors.azure.eventhub;

import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.IllegalConnectionStringFormatException;
import com.microsoft.azure.eventhubs.impl.EventHubClientImpl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.FlowFileResultCarrier;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;

@CapabilityDescription("Sends the contents of a FlowFile to Windows Azure Event Hubs. Note: the content of the FlowFile will be buffered into memory before being sent, so care should be taken to avoid sending FlowFiles to this Processor that exceed the amount of Java Heap Space available. Also please be aware that this processor creates a thread pool of 4 threads for Event Hub Client. They will be extra threads other than the concurrent tasks scheduled for this processor.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@SupportsBatching
@Tags({"microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming"})
/* loaded from: input_file:org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.class */
public class PutAzureEventHub extends AbstractProcessor {
    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder().name("Event Hub Name").description("The name of the event hub to send to").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).build();
    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder().name("Event Hub Namespace").description("The namespace that the event hub is assigned to. This is generally equal to <Event Hubs Name>-ns").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder().name("Shared Access Policy Name").description("The name of the shared access policy. This policy must have Send claims.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder().name("Shared Access Policy Primary Key").description("The primary key of the shared access policy").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).sensitive(true).required(true).build();
    static final PropertyDescriptor PARTITIONING_KEY_ATTRIBUTE_NAME = new PropertyDescriptor.Builder().name("partitioning-key-attribute-name").displayName("Partitioning Key Attribute Name").description("If specified, the value from argument named by this field will be used as a partitioning key to be used by event hub.").required(false).expressionLanguageSupported(ExpressionLanguageScope.NONE).addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR).defaultValue((String) null).build();
    static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder().name("max-batch-size").displayName("Maximum batch size").description("Maximum count of flow files being processed in one batch.").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).addValidator(StandardValidators.NUMBER_VALIDATOR).defaultValue("100").build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully sent to the event hubs will be transferred to this Relationship.").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that could not be sent to the event hub will be transferred to this Relationship.").build();
    private volatile BlockingQueue<EventHubClient> senderQueue = new LinkedBlockingQueue();
    private static final List<PropertyDescriptor> propertyDescriptors;
    private static final Set<Relationship> relationships;
    private ScheduledExecutorService executor;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    @OnScheduled
    public final void setupClient(ProcessContext processContext) throws ProcessException {
    }

    @OnStopped
    public void tearDown() {
        while (true) {
            EventHubClient poll = this.senderQueue.poll();
            if (poll == null) {
                return;
            } else {
                poll.close();
            }
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        populateSenderQueue(processContext);
        StopWatch stopWatch = new StopWatch(true);
        String value = processContext.getProperty(PARTITIONING_KEY_ATTRIBUTE_NAME).getValue();
        List<FlowFile> list = processSession.get(NumberUtils.toInt(processContext.getProperty(MAX_BATCH_SIZE).getValue(), 100));
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        for (FlowFile flowFile : list) {
            if (flowFile != null) {
                linkedBlockingQueue.offer(handleFlowFile(flowFile, value, processSession));
            }
        }
        waitForAllFutures(processContext, processSession, stopWatch, linkedBlockingQueue);
    }

    protected void waitForAllFutures(ProcessContext processContext, ProcessSession processSession, StopWatch stopWatch, BlockingQueue<CompletableFuture<FlowFileResultCarrier<Relationship>>> blockingQueue) {
        try {
            Iterator it = blockingQueue.iterator();
            while (it.hasNext()) {
                CompletableFuture completableFuture = (CompletableFuture) it.next();
                completableFuture.join();
                FlowFileResultCarrier flowFileResultCarrier = (FlowFileResultCarrier) completableFuture.get();
                if (flowFileResultCarrier != null) {
                    FlowFile flowFile = flowFileResultCarrier.getFlowFile();
                    if (flowFileResultCarrier.getResult() == REL_SUCCESS) {
                        processSession.getProvenanceReporter().send(flowFile, "amqps://" + processContext.getProperty(NAMESPACE).getValue() + ".servicebus.windows.net/" + processContext.getProperty(EVENT_HUB_NAME).getValue(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                        processSession.transfer(flowFile, REL_SUCCESS);
                    } else {
                        Throwable exception = flowFileResultCarrier.getException();
                        getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[]{flowFile, exception}, exception);
                        processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
                    }
                }
            }
        } catch (InterruptedException | CancellationException | CompletionException | ExecutionException e) {
            getLogger().error("Batch processing failed", e);
            processSession.rollback();
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new ProcessException("Batch processing failed", e);
        }
    }

    protected CompletableFuture<FlowFileResultCarrier<Relationship>> handleFlowFile(FlowFile flowFile, String str, ProcessSession processSession) {
        byte[] bArr = new byte[(int) flowFile.getSize()];
        processSession.read(flowFile, inputStream -> {
            StreamUtils.fillBuffer(inputStream, bArr);
        });
        String attribute = StringUtils.isNotBlank(str) ? flowFile.getAttribute(str) : null;
        Map attributes = flowFile.getAttributes();
        try {
            return sendMessage(bArr, attribute, attributes == null ? Collections.emptyMap() : new HashMap(attributes)).thenApplyAsync(r6 -> {
                return new FlowFileResultCarrier(flowFile, REL_SUCCESS);
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                return new FlowFileResultCarrier(flowFile, REL_FAILURE, th);
            });
        } catch (ProcessException e) {
            return CompletableFuture.completedFuture(new FlowFileResultCarrier(flowFile, REL_FAILURE, e));
        }
    }

    protected void populateSenderQueue(ProcessContext processContext) {
        if (this.senderQueue.size() == 0) {
            int maxConcurrentTasks = processContext.getMaxConcurrentTasks();
            this.senderQueue = new LinkedBlockingQueue(maxConcurrentTasks);
            this.executor = Executors.newScheduledThreadPool(4);
            String value = processContext.getProperty(ACCESS_POLICY).getValue();
            String value2 = processContext.getProperty(POLICY_PRIMARY_KEY).getValue();
            String value3 = processContext.getProperty(NAMESPACE).getValue();
            String value4 = processContext.getProperty(EVENT_HUB_NAME).getValue();
            for (int i = 0; i < maxConcurrentTasks; i++) {
                EventHubClient createEventHubClient = createEventHubClient(value3, value4, value, value2, this.executor);
                if (null != createEventHubClient) {
                    this.senderQueue.offer(createEventHubClient);
                }
            }
        }
    }

    protected EventHubClient createEventHubClient(String str, String str2, String str3, String str4, ScheduledExecutorService scheduledExecutorService) throws ProcessException {
        try {
            EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/2.3.2";
            return EventHubClient.createSync(getConnectionString(str, str2, str3, str4), scheduledExecutorService);
        } catch (IOException | EventHubException | IllegalConnectionStringFormatException e) {
            getLogger().error("Failed to create EventHubClient due to {}", e);
            throw new ProcessException(e);
        }
    }

    protected String getConnectionString(String str, String str2, String str3, String str4) {
        return new ConnectionStringBuilder().setNamespaceName(str).setEventHubName(str2).setSasKeyName(str3).setSasKey(str4).toString();
    }

    protected CompletableFuture<Void> sendMessage(byte[] bArr, String str, Map<String, Object> map) throws ProcessException {
        EventHubClient poll = this.senderQueue.poll();
        if (poll == null) {
            throw new ProcessException("No EventHubClients are configured for sending");
        }
        EventData create = EventData.create(bArr);
        Map properties = create.getProperties();
        if (map != null && properties != null) {
            properties.putAll(map);
        }
        CompletableFuture<Void> send = StringUtils.isNotBlank(str) ? poll.send(create, str) : poll.send(create);
        this.senderQueue.offer(poll);
        return send;
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.add(EVENT_HUB_NAME);
        arrayList.add(NAMESPACE);
        arrayList.add(ACCESS_POLICY);
        arrayList.add(POLICY_PRIMARY_KEY);
        arrayList.add(PARTITIONING_KEY_ATTRIBUTE_NAME);
        arrayList.add(MAX_BATCH_SIZE);
        propertyDescriptors = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(hashSet);
    }
}
