package org.apache.nifi.processors.azure.eventhub;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.ReceiverDisconnectedException;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.IEventProcessorFactory;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;

@CapabilityDescription("Receives messages from Azure Event Hubs, writing the contents of the message to the content of the FlowFile.")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerSerially
@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@WritesAttributes({@WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the event hub"), @WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"), @WritesAttribute(attribute = "eventhub.sequence", description = "The sequence number associated with the message"), @WritesAttribute(attribute = "eventhub.name", description = "The name of the event hub from which the message was pulled"), @WritesAttribute(attribute = "eventhub.partition", description = "The name of the partition from which the message was pulled"), @WritesAttribute(attribute = "eventhub.property.*", description = "The application properties of this message. IE: 'application' would be 'eventhub.property.application'")})
/* loaded from: input_file:org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.class */
public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
    private static final String FORMAT_STORAGE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
    private static final Set<Relationship> RELATIONSHIPS;
    private static final Set<Relationship> RECORD_RELATIONSHIPS;
    private volatile EventProcessorHost eventProcessorHost;
    private volatile ProcessSessionFactory processSessionFactory;
    private volatile RecordReaderFactory readerFactory;
    private volatile RecordSetWriterFactory writerFactory;
    private volatile String namespaceName;
    private volatile boolean isRecordReaderSet = false;
    private volatile boolean isRecordWriterSet = false;
    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder().name("event-hub-namespace").displayName("Event Hub Namespace").description("The namespace that the Azure Event Hubs is assigned to. This is generally equal to <Event Hub Names>-ns.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(true).build();
    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder().name("event-hub-name").displayName("Event Hub Name").description("The name of the event hub to pull messages from.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(true).build();
    static final PropertyDescriptor ACCESS_POLICY_NAME = new PropertyDescriptor.Builder().name("event-hub-shared-access-policy-name").displayName("Shared Access Policy Name").description("The name of the shared access policy. This policy must have Listen claims.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(false).build();
    static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder().fromPropertyDescriptor(AzureEventHubUtils.POLICY_PRIMARY_KEY).name("event-hub-shared-access-policy-primary-key").build();
    static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY;
    static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder().name("event-hub-consumer-group").displayName("Consumer Group").description("The name of the consumer group to use.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("$Default").required(true).build();
    static final PropertyDescriptor CONSUMER_HOSTNAME = new PropertyDescriptor.Builder().name("event-hub-consumer-hostname").displayName("Consumer Hostname").description("The hostname of this event hub consumer instance. If not specified, an unique identifier is generated in 'nifi-<UUID>' format.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(false).build();
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("The Record Reader to use for reading received messages. The event hub name can be referred by Expression Language '${eventhub.name}' to access a schema.").identifiesControllerService(RecordReaderFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("The Record Writer to use for serializing Records to an output FlowFile. The event hub name can be referred by Expression Language '${eventhub.name}' to access a schema. If not specified, each message will create a FlowFile.").identifiesControllerService(RecordSetWriterFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final AllowableValue INITIAL_OFFSET_START_OF_STREAM = new AllowableValue("start-of-stream", "Start of stream", "Read from the oldest message retained in the stream.");
    static final AllowableValue INITIAL_OFFSET_END_OF_STREAM = new AllowableValue("end-of-stream", "End of stream", "Ignore old retained messages even if exist, start reading new ones from now.");
    static final PropertyDescriptor INITIAL_OFFSET = new PropertyDescriptor.Builder().name("event-hub-initial-offset").displayName("Initial Offset").description("Specify where to start receiving messages if offset is not stored in Azure Storage.").required(true).allowableValues(new AllowableValue[]{INITIAL_OFFSET_START_OF_STREAM, INITIAL_OFFSET_END_OF_STREAM}).defaultValue(INITIAL_OFFSET_END_OF_STREAM.getValue()).build();
    static final PropertyDescriptor PREFETCH_COUNT = new PropertyDescriptor.Builder().name("event-hub-prefetch-count").displayName("Prefetch Count").defaultValue("The number of messages to fetch from the event hub before processing. This parameter affects throughput. The more prefetch count, the better throughput in general, but consumes more resources (RAM). NOTE: Even though the event hub client API provides this option, actual number of messages can be pre-fetched is depend on the Event Hubs server implementation. It is reported that only one event is received at a time in certain situation. https://github.com/Azure/azure-event-hubs-java/issues/125").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("300").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(true).build();
    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("event-hub-batch-size").displayName("Batch Size").description("The number of messages to process within a NiFi session. This parameter affects throughput and consistency. NiFi commits its session and Event Hubs checkpoints after processing this number of messages. If NiFi session is committed, but fails to create an Event Hubs checkpoint, then it is possible that the same messages will be received again. The higher number, the higher throughput, but possibly less consistent.").defaultValue("10").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(true).build();
    static final PropertyDescriptor RECEIVE_TIMEOUT = new PropertyDescriptor.Builder().name("event-hub-message-receive-timeout").displayName("Message Receive Timeout").description("The amount of time this consumer should wait to receive the Prefetch Count before returning.").defaultValue("1 min").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(true).build();
    static final PropertyDescriptor STORAGE_ACCOUNT_NAME = new PropertyDescriptor.Builder().name(AzureStorageUtils.STORAGE_ACCOUNT_NAME_PROPERTY_DESCRIPTOR_NAME).displayName("Storage Account Name").description("Name of the Azure Storage account to store event hub consumer group state.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(true).build();
    static final PropertyDescriptor STORAGE_ACCOUNT_KEY = new PropertyDescriptor.Builder().name(AzureStorageUtils.STORAGE_ACCOUNT_KEY_PROPERTY_DESCRIPTOR_NAME).displayName("Storage Account Key").description("The Azure Storage account key to store event hub consumer group state.").sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(true).build();
    static final PropertyDescriptor STORAGE_CONTAINER_NAME = new PropertyDescriptor.Builder().name("storage-container-name").displayName("Storage Container Name").description("Name of the Azure Storage container to store the event hub consumer group state. If not specified, event hub name is used.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(false).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles received from Event Hub.").build();
    static final Relationship REL_PARSE_FAILURE = new Relationship.Builder().name("parse.failure").description("If a message from event hub cannot be parsed using the configured Record Reader or failed to be written by the configured Record Writer, the contents of the message will be routed to this Relationship as its own individual FlowFile.").build();
    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(NAMESPACE, EVENT_HUB_NAME, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, CONSUMER_GROUP, CONSUMER_HOSTNAME, RECORD_READER, RECORD_WRITER, INITIAL_OFFSET, PREFETCH_COUNT, BATCH_SIZE, RECEIVE_TIMEOUT, STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_CONTAINER_NAME));

    /* loaded from: input_file:org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub$EventProcessor.class */
    public class EventProcessor implements IEventProcessor {
        public EventProcessor() {
        }

        public void onOpen(PartitionContext partitionContext) throws Exception {
            ConsumeAzureEventHub.this.getLogger().info("Consumer group {} opened partition {} of {}", new Object[]{partitionContext.getConsumerGroupName(), partitionContext.getPartitionId(), partitionContext.getEventHubPath()});
        }

        public void onClose(PartitionContext partitionContext, CloseReason closeReason) throws Exception {
            ConsumeAzureEventHub.this.getLogger().info("Consumer group {} closed partition {} of {}. reason={}", new Object[]{partitionContext.getConsumerGroupName(), partitionContext.getPartitionId(), partitionContext.getEventHubPath(), closeReason});
        }

        public void onEvents(PartitionContext partitionContext, Iterable<EventData> iterable) throws Exception {
            ProcessSession createSession = ConsumeAzureEventHub.this.processSessionFactory.createSession();
            try {
                StopWatch stopWatch = new StopWatch(true);
                if (ConsumeAzureEventHub.this.readerFactory == null || ConsumeAzureEventHub.this.writerFactory == null) {
                    writeFlowFiles(partitionContext, iterable, createSession, stopWatch);
                } else {
                    writeRecords(partitionContext, iterable, createSession, stopWatch);
                }
                createSession.commit();
                partitionContext.checkpoint();
            } catch (Exception e) {
                ConsumeAzureEventHub.this.getLogger().error("Unable to fully process received message due to " + e, e);
                createSession.rollback();
            }
        }

        private void putEventHubAttributes(Map<String, String> map, String str, String str2, EventData eventData) {
            EventData.SystemProperties systemProperties = eventData.getSystemProperties();
            if (null != systemProperties) {
                map.put("eventhub.enqueued.timestamp", String.valueOf(systemProperties.getEnqueuedTime()));
                map.put("eventhub.offset", systemProperties.getOffset());
                map.put("eventhub.sequence", String.valueOf(systemProperties.getSequenceNumber()));
            }
            map.putAll(AzureEventHubUtils.getApplicationProperties(eventData));
            map.put("eventhub.name", str);
            map.put("eventhub.partition", str2);
        }

        private void writeFlowFiles(PartitionContext partitionContext, Iterable<EventData> iterable, ProcessSession processSession, StopWatch stopWatch) {
            String eventHubPath = partitionContext.getEventHubPath();
            String partitionId = partitionContext.getPartitionId();
            String consumerGroupName = partitionContext.getConsumerGroupName();
            iterable.forEach(eventData -> {
                FlowFile create = processSession.create();
                HashMap hashMap = new HashMap();
                putEventHubAttributes(hashMap, eventHubPath, partitionId, eventData);
                transferTo(ConsumeAzureEventHub.REL_SUCCESS, processSession, stopWatch, eventHubPath, partitionId, consumerGroupName, processSession.write(processSession.putAllAttributes(create, hashMap), outputStream -> {
                    outputStream.write(eventData.getBytes());
                }));
            });
        }

        private void transferTo(Relationship relationship, ProcessSession processSession, StopWatch stopWatch, String str, String str2, String str3, FlowFile flowFile) {
            processSession.transfer(flowFile, relationship);
            processSession.getProvenanceReporter().receive(flowFile, "amqps://" + ConsumeAzureEventHub.this.namespaceName + ".servicebus.windows.net/" + str + "/ConsumerGroups/" + str3 + "/Partitions/" + str2, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
        }

        private void writeRecords(PartitionContext partitionContext, Iterable<EventData> iterable, ProcessSession processSession, StopWatch stopWatch) throws SchemaNotFoundException, IOException {
            ByteArrayInputStream byteArrayInputStream;
            Throwable th;
            String eventHubPath = partitionContext.getEventHubPath();
            String partitionId = partitionContext.getPartitionId();
            String consumerGroupName = partitionContext.getConsumerGroupName();
            HashMap hashMap = new HashMap();
            hashMap.put("eventhub.name", eventHubPath);
            ComponentLog logger = ConsumeAzureEventHub.this.getLogger();
            FlowFile create = processSession.create();
            HashMap hashMap2 = new HashMap();
            RecordSetWriter recordSetWriter = null;
            EventData eventData = null;
            WriteResult writeResult = null;
            int i = 0;
            OutputStream write = processSession.write(create);
            Throwable th2 = null;
            try {
                for (EventData eventData2 : iterable) {
                    try {
                        byteArrayInputStream = new ByteArrayInputStream(eventData2.getBytes());
                        th = null;
                    } catch (Exception e) {
                        logger.error("Failed to parse message from Azure Event Hub using configured Record Reader and Writer due to " + e, e);
                        FlowFile create2 = processSession.create();
                        processSession.write(create2, outputStream -> {
                            outputStream.write(eventData2.getBytes());
                        });
                        putEventHubAttributes(hashMap2, eventHubPath, partitionId, eventData2);
                        transferTo(ConsumeAzureEventHub.REL_PARSE_FAILURE, processSession, stopWatch, eventHubPath, partitionId, consumerGroupName, processSession.putAllAttributes(create2, hashMap2));
                    }
                    try {
                        try {
                            RecordReader createRecordReader = ConsumeAzureEventHub.this.readerFactory.createRecordReader(hashMap, byteArrayInputStream, r0.length, logger);
                            while (true) {
                                Record nextRecord = createRecordReader.nextRecord();
                                if (nextRecord == null) {
                                    break;
                                }
                                if (recordSetWriter == null) {
                                    recordSetWriter = ConsumeAzureEventHub.this.writerFactory.createWriter(logger, ConsumeAzureEventHub.this.writerFactory.getSchema(hashMap, nextRecord.getSchema()), write, create);
                                    recordSetWriter.beginRecordSet();
                                }
                                writeResult = recordSetWriter.write(nextRecord);
                                i += writeResult.getRecordCount();
                            }
                            eventData = eventData2;
                            if (byteArrayInputStream != null) {
                                if (0 != 0) {
                                    try {
                                        byteArrayInputStream.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                } else {
                                    byteArrayInputStream.close();
                                }
                            }
                        } catch (Throwable th4) {
                            th = th4;
                            throw th4;
                        }
                    } catch (Throwable th5) {
                        if (byteArrayInputStream != null) {
                            if (th != null) {
                                try {
                                    byteArrayInputStream.close();
                                } catch (Throwable th6) {
                                    th.addSuppressed(th6);
                                }
                            } else {
                                byteArrayInputStream.close();
                            }
                        }
                        throw th5;
                    }
                }
                if (eventData != null) {
                    putEventHubAttributes(hashMap2, eventHubPath, partitionId, eventData);
                    hashMap2.put("record.count", String.valueOf(i));
                    if (recordSetWriter != null) {
                        recordSetWriter.finishRecordSet();
                        hashMap2.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
                        if (writeResult != null) {
                            hashMap2.putAll(writeResult.getAttributes());
                        }
                        try {
                            recordSetWriter.close();
                        } catch (IOException e2) {
                            logger.warn("Failed to close Record Writer due to {}" + e2, e2);
                        }
                    }
                }
                if (eventData == null) {
                    processSession.remove(create);
                } else {
                    transferTo(ConsumeAzureEventHub.REL_SUCCESS, processSession, stopWatch, eventHubPath, partitionId, consumerGroupName, processSession.putAllAttributes(create, hashMap2));
                }
            } finally {
                if (write != null) {
                    if (0 != 0) {
                        try {
                            write.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        write.close();
                    }
                }
            }
        }

        public void onError(PartitionContext partitionContext, Throwable th) {
            if ((th instanceof ReceiverDisconnectedException) && th.getMessage().startsWith("New receiver with higher epoch of ")) {
                ConsumeAzureEventHub.this.getLogger().info("New receiver took over partition {} of Azure Event Hub {}, consumerGroupName={}, message={}", new Object[]{partitionContext.getPartitionId(), partitionContext.getEventHubPath(), partitionContext.getConsumerGroupName(), th.getMessage()});
            } else {
                ConsumeAzureEventHub.this.getLogger().error("An error occurred while receiving messages from Azure Event Hub {} at partition {}, consumerGroupName={}, exception={}", new Object[]{partitionContext.getEventHubPath(), partitionContext.getPartitionId(), partitionContext.getConsumerGroupName(), th}, th);
            }
        }
    }

    /* loaded from: input_file:org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub$EventProcessorFactory.class */
    public class EventProcessorFactory implements IEventProcessorFactory<EventProcessor> {
        public EventProcessorFactory() {
        }

        /* renamed from: createEventProcessor, reason: merged with bridge method [inline-methods] */
        public EventProcessor m6createEventProcessor(PartitionContext partitionContext) throws Exception {
            return new EventProcessor();
        }
    }

    void setProcessSessionFactory(ProcessSessionFactory processSessionFactory) {
        this.processSessionFactory = processSessionFactory;
    }

    void setNamespaceName(String str) {
        this.namespaceName = str;
    }

    public void setReaderFactory(RecordReaderFactory recordReaderFactory) {
        this.readerFactory = recordReaderFactory;
    }

    public void setWriterFactory(RecordSetWriterFactory recordSetWriterFactory) {
        this.writerFactory = recordSetWriterFactory;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    public Set<Relationship> getRelationships() {
        return (this.isRecordReaderSet && this.isRecordWriterSet) ? RECORD_RELATIONSHIPS : RELATIONSHIPS;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList();
        ControllerService asControllerService = validationContext.getProperty(RECORD_READER).asControllerService();
        ControllerService asControllerService2 = validationContext.getProperty(RECORD_WRITER).asControllerService();
        if ((asControllerService != null && asControllerService2 == null) || (asControllerService == null && asControllerService2 != null)) {
            arrayList.add(new ValidationResult.Builder().subject("Record Reader and Writer").explanation(String.format("Both %s and %s should be set in order to write FlowFiles as Records.", RECORD_READER.getDisplayName(), RECORD_WRITER.getDisplayName())).valid(false).build());
        }
        arrayList.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, validationContext));
        return arrayList;
    }

    public void onPropertyModified(PropertyDescriptor propertyDescriptor, String str, String str2) {
        if (RECORD_READER.equals(propertyDescriptor)) {
            this.isRecordReaderSet = !StringUtils.isEmpty(str2);
        } else if (RECORD_WRITER.equals(propertyDescriptor)) {
            this.isRecordWriterSet = !StringUtils.isEmpty(str2);
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory) throws ProcessException {
        if (this.eventProcessorHost == null) {
            try {
                registerEventProcessor(processContext);
                this.processSessionFactory = processSessionFactory;
                this.readerFactory = processContext.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
                this.writerFactory = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
            } catch (IllegalArgumentException e) {
                throw e;
            } catch (Exception e2) {
                throw new ProcessException("Failed to register the event processor due to " + e2, e2);
            }
        }
        processContext.yield();
    }

    @OnStopped
    public void unregisterEventProcessor(ProcessContext processContext) {
        if (this.eventProcessorHost != null) {
            try {
                this.eventProcessorHost.unregisterEventProcessor();
                this.eventProcessorHost = null;
                this.processSessionFactory = null;
                this.readerFactory = null;
                this.writerFactory = null;
            } catch (Exception e) {
                throw new RuntimeException("Failed to unregister the event processor due to " + e, e);
            }
        }
    }

    private void registerEventProcessor(ProcessContext processContext) throws Exception {
        String sharedAccessSignatureConnectionString;
        String value = processContext.getProperty(CONSUMER_GROUP).evaluateAttributeExpressions().getValue();
        validateRequiredProperty(CONSUMER_GROUP, value);
        this.namespaceName = processContext.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
        validateRequiredProperty(NAMESPACE, this.namespaceName);
        String value2 = processContext.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
        validateRequiredProperty(EVENT_HUB_NAME, value2);
        String value3 = processContext.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
        validateRequiredProperty(STORAGE_ACCOUNT_NAME, value3);
        String value4 = processContext.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
        validateRequiredProperty(STORAGE_ACCOUNT_KEY, value4);
        String orDefault = orDefault(processContext.getProperty(CONSUMER_HOSTNAME).evaluateAttributeExpressions().getValue(), EventProcessorHost.createHostName("nifi"));
        String orDefault2 = orDefault(processContext.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), value2);
        EventProcessorOptions eventProcessorOptions = new EventProcessorOptions();
        String value5 = processContext.getProperty(INITIAL_OFFSET).getValue();
        if (INITIAL_OFFSET_START_OF_STREAM.getValue().equals(value5)) {
            eventProcessorOptions.getClass();
            eventProcessorOptions.setInitialPositionProvider(new EventProcessorOptions.StartOfStreamInitialPositionProvider(eventProcessorOptions));
        } else {
            if (!INITIAL_OFFSET_END_OF_STREAM.getValue().equals(value5)) {
                throw new IllegalArgumentException("Initial offset " + value5 + " is not allowed.");
            }
            eventProcessorOptions.getClass();
            eventProcessorOptions.setInitialPositionProvider(new EventProcessorOptions.EndOfStreamInitialPositionProvider(eventProcessorOptions));
        }
        Integer asInteger = processContext.getProperty(PREFETCH_COUNT).evaluateAttributeExpressions().asInteger();
        if (asInteger != null && asInteger.intValue() > 0) {
            eventProcessorOptions.setPrefetchCount(asInteger.intValue());
        }
        Integer asInteger2 = processContext.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
        if (asInteger2 != null && asInteger2.intValue() > 0) {
            eventProcessorOptions.setMaxBatchSize(asInteger2.intValue());
        }
        eventProcessorOptions.setReceiveTimeOut(Duration.ofMillis(processContext.getProperty(RECEIVE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).longValue()));
        String format = String.format(FORMAT_STORAGE_CONNECTION_STRING, value3, value4);
        if (processContext.getProperty(USE_MANAGED_IDENTITY).asBoolean().booleanValue()) {
            sharedAccessSignatureConnectionString = AzureEventHubUtils.getManagedIdentityConnectionString(this.namespaceName, value2);
        } else {
            String value6 = processContext.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue();
            validateRequiredProperty(ACCESS_POLICY_NAME, value6);
            String value7 = processContext.getProperty(POLICY_PRIMARY_KEY).evaluateAttributeExpressions().getValue();
            validateRequiredProperty(POLICY_PRIMARY_KEY, value7);
            sharedAccessSignatureConnectionString = AzureEventHubUtils.getSharedAccessSignatureConnectionString(this.namespaceName, value2, value6, value7);
        }
        this.eventProcessorHost = EventProcessorHost.EventProcessorHostBuilder.newBuilder(orDefault, value).useAzureStorageCheckpointLeaseManager(format, orDefault2, (String) null).useEventHubConnectionString(sharedAccessSignatureConnectionString, value2).build();
        eventProcessorOptions.setExceptionNotification(exceptionReceivedEventArgs -> {
            getLogger().error("An error occurred while receiving messages from Azure Event Hub {} at consumer group {} and partition {}, action={}, hostname={}, exception={}", new Object[]{value2, value, exceptionReceivedEventArgs.getPartitionId(), exceptionReceivedEventArgs.getAction(), exceptionReceivedEventArgs.getHostname()}, exceptionReceivedEventArgs.getException());
        });
        this.eventProcessorHost.registerEventProcessorFactory(new EventProcessorFactory(), eventProcessorOptions).get();
    }

    private String orDefault(String str, String str2) {
        return StringUtils.isEmpty(str) ? str2 : str;
    }

    private void validateRequiredProperty(PropertyDescriptor propertyDescriptor, String str) {
        if (StringUtils.isEmpty(str)) {
            throw new IllegalArgumentException(String.format("'%s' is required, but not specified.", propertyDescriptor.getDisplayName()));
        }
    }

    static {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        RELATIONSHIPS = Collections.unmodifiableSet(hashSet);
        hashSet.add(REL_PARSE_FAILURE);
        RECORD_RELATIONSHIPS = Collections.unmodifiableSet(hashSet);
    }
}
