package org.apache.nifi.processors.azure.eventhub;

import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.impl.EventHubClientImpl;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;

@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, writing the contents of the Azure message to the content of the FlowFile. Note: Please be aware that this processor creates a thread pool of 4 threads for Event Hub Client. They will be extra threads other than the concurrent tasks scheduled for this processor.")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@WritesAttributes({@WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the Azure Event Hub"), @WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"), @WritesAttribute(attribute = "eventhub.sequence", description = "The Azure Sequence number associated with the message"), @WritesAttribute(attribute = "eventhub.name", description = "The name of the Event Hub from which the message was pulled"), @WritesAttribute(attribute = "eventhub.partition", description = "The name of the Azure Partition from which the message was pulled")})
/* loaded from: input_file:org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.class */
public class GetAzureEventHub extends AbstractProcessor {
    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder().name("Event Hub Name").description("The name of the Azure Event Hub to pull messages from").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).build();
    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder().name("Event Hub Namespace").description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to <Event Hub Name>-ns").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder().name("Service Bus Endpoint").description("To support Namespaces in non-standard Host URIs ( not .servicebus.windows.net,  ie .servicebus.chinacloudapi.cn) select from the drop down acceptable options ").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new String[]{".servicebus.windows.net", ".servicebus.chinacloudapi.cn"}).defaultValue(".servicebus.windows.net").required(true).build();
    static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder().name("Shared Access Policy Name").description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder().name("Shared Access Policy Primary Key").description("The primary key of the Event Hub Shared Access Policy").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).sensitive(true).required(true).build();
    static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder().name("Number of Event Hub Partitions").description("The number of partitions that the Event Hub has. Only this number of partitions will be used, so it is important to ensure that if the number of partitions changes that this value be updated. Otherwise, some messages may not be consumed.").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder().name("Event Hub Consumer Group").description("The name of the Event Hub Consumer Group to use when pulling events").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).defaultValue("$Default").required(true).build();
    static final PropertyDescriptor ENQUEUE_TIME = new PropertyDescriptor.Builder().name("Event Hub Message Enqueue Time").description("A timestamp (ISO-8601 Instant) formatted as YYYY-MM-DDThhmmss.sssZ (2016-01-01T01:01:01.000Z) from which messages should have been enqueued in the EventHub to start reading from").addValidator(StandardValidators.ISO8601_INSTANT_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final PropertyDescriptor RECEIVER_FETCH_SIZE = new PropertyDescriptor.Builder().name("Partition Recivier Fetch Size").description("The number of events that a receiver should fetch from an EventHubs partition before returning. Default(100)").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final PropertyDescriptor RECEIVER_FETCH_TIMEOUT = new PropertyDescriptor.Builder().name("Partiton Receiver Timeout (millseconds)").description("The amount of time a Partition Receiver should wait to receive the Fetch Size before returning. Default(60000)").addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully received from the Azure Event Hub will be transferred to this Relationship.").build();
    private final ConcurrentMap<String, PartitionReceiver> partitionToReceiverMap = new ConcurrentHashMap();
    private volatile BlockingQueue<String> partitionNames = new LinkedBlockingQueue();
    private volatile Instant configuredEnqueueTime;
    private volatile int receiverFetchSize;
    private volatile Duration receiverFetchTimeout;
    private EventHubClient eventHubClient;
    private static final List<PropertyDescriptor> propertyDescriptors;
    private static final Set<Relationship> relationships;
    private ScheduledExecutorService executor;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    protected void setupReceiver(String str, ScheduledExecutorService scheduledExecutorService) throws ProcessException {
        try {
            EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/2.3.2";
            this.eventHubClient = EventHubClient.createSync(str, scheduledExecutorService);
        } catch (IOException | EventHubException e) {
            throw new ProcessException(e);
        }
    }

    PartitionReceiver getReceiver(ProcessContext processContext, String str) throws IOException, EventHubException, ExecutionException, InterruptedException {
        PartitionReceiver partitionReceiver = this.partitionToReceiverMap.get(str);
        if (partitionReceiver != null) {
            return partitionReceiver;
        }
        synchronized (this) {
            PartitionReceiver partitionReceiver2 = this.partitionToReceiverMap.get(str);
            if (partitionReceiver2 != null) {
                return partitionReceiver2;
            }
            PartitionReceiver partitionReceiver3 = (PartitionReceiver) this.eventHubClient.createReceiver(processContext.getProperty(CONSUMER_GROUP).getValue(), str, EventPosition.fromEnqueuedTime(this.configuredEnqueueTime == null ? Instant.now() : this.configuredEnqueueTime)).get();
            partitionReceiver3.setReceiveTimeout(this.receiverFetchTimeout == null ? Duration.ofMillis(60000L) : this.receiverFetchTimeout);
            this.partitionToReceiverMap.put(str, partitionReceiver3);
            return partitionReceiver3;
        }
    }

    protected Iterable<EventData> receiveEvents(ProcessContext processContext, String str) throws ProcessException {
        try {
            return (Iterable) getReceiver(processContext, str).receive(this.receiverFetchSize).get();
        } catch (EventHubException | IOException | InterruptedException | ExecutionException e) {
            throw new ProcessException(e);
        }
    }

    @OnStopped
    public void tearDown() throws ProcessException {
        for (PartitionReceiver partitionReceiver : this.partitionToReceiverMap.values()) {
            if (null != partitionReceiver) {
                partitionReceiver.close();
            }
        }
        this.partitionToReceiverMap.clear();
        try {
            if (null != this.eventHubClient) {
                this.eventHubClient.closeSync();
            }
            this.executor.shutdown();
        } catch (EventHubException e) {
            throw new ProcessException(e);
        }
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) throws ProcessException, URISyntaxException {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        for (int i = 0; i < processContext.getProperty(NUM_PARTITIONS).asInteger().intValue(); i++) {
            linkedBlockingQueue.add(String.valueOf(i));
        }
        this.partitionNames = linkedBlockingQueue;
        String value = processContext.getProperty(ACCESS_POLICY).getValue();
        String value2 = processContext.getProperty(POLICY_PRIMARY_KEY).getValue();
        String value3 = processContext.getProperty(NAMESPACE).getValue();
        String value4 = processContext.getProperty(EVENT_HUB_NAME).getValue();
        String value5 = processContext.getProperty(SERVICE_BUS_ENDPOINT).getValue();
        if (processContext.getProperty(ENQUEUE_TIME).isSet()) {
            this.configuredEnqueueTime = Instant.parse(processContext.getProperty(ENQUEUE_TIME).toString());
        } else {
            this.configuredEnqueueTime = null;
        }
        if (processContext.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
            this.receiverFetchSize = processContext.getProperty(RECEIVER_FETCH_SIZE).asInteger().intValue();
        } else {
            this.receiverFetchSize = 100;
        }
        if (processContext.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) {
            this.receiverFetchTimeout = Duration.ofMillis(processContext.getProperty(RECEIVER_FETCH_TIMEOUT).asLong().longValue());
        } else {
            this.receiverFetchTimeout = null;
        }
        this.executor = Executors.newScheduledThreadPool(4);
        setupReceiver(new ConnectionStringBuilder().setEndpoint(new URI("amqps://" + value3 + value5)).setEventHubName(value4).setSasKeyName(value).setSasKey(value2).toString(), this.executor);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        BlockingQueue<String> blockingQueue = this.partitionNames;
        String poll = blockingQueue.poll();
        if (poll == null) {
            getLogger().debug("No partitions available");
            return;
        }
        StopWatch stopWatch = new StopWatch(true);
        try {
            Iterable<EventData> receiveEvents = receiveEvents(processContext, poll);
            if (receiveEvents == null) {
                return;
            }
            for (EventData eventData : receiveEvents) {
                if (null != eventData) {
                    HashMap hashMap = new HashMap();
                    FlowFile create = processSession.create();
                    EventData.SystemProperties systemProperties = eventData.getSystemProperties();
                    if (null != systemProperties) {
                        hashMap.put("eventhub.enqueued.timestamp", String.valueOf(systemProperties.getEnqueuedTime()));
                        hashMap.put("eventhub.offset", systemProperties.getOffset());
                        hashMap.put("eventhub.sequence", String.valueOf(systemProperties.getSequenceNumber()));
                    }
                    hashMap.put("eventhub.name", processContext.getProperty(EVENT_HUB_NAME).getValue());
                    hashMap.put("eventhub.partition", poll);
                    FlowFile write = processSession.write(processSession.putAllAttributes(create, hashMap), outputStream -> {
                        outputStream.write(eventData.getBytes());
                    });
                    processSession.transfer(write, REL_SUCCESS);
                    processSession.getProvenanceReporter().receive(write, "amqps://" + processContext.getProperty(NAMESPACE).getValue() + processContext.getProperty(SERVICE_BUS_ENDPOINT).getValue() + "/" + processContext.getProperty(EVENT_HUB_NAME).getValue() + "/ConsumerGroups/" + processContext.getProperty(CONSUMER_GROUP).getValue() + "/Partitions/" + poll, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                }
            }
            blockingQueue.offer(poll);
        } finally {
            blockingQueue.offer(poll);
        }
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.add(EVENT_HUB_NAME);
        arrayList.add(SERVICE_BUS_ENDPOINT);
        arrayList.add(NAMESPACE);
        arrayList.add(ACCESS_POLICY);
        arrayList.add(POLICY_PRIMARY_KEY);
        arrayList.add(NUM_PARTITIONS);
        arrayList.add(CONSUMER_GROUP);
        arrayList.add(ENQUEUE_TIME);
        arrayList.add(RECEIVER_FETCH_SIZE);
        arrayList.add(RECEIVER_FETCH_TIMEOUT);
        propertyDescriptors = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        relationships = Collections.unmodifiableSet(hashSet);
    }
}
